Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

Merged
merged 17 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
* [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218
* [CHANGE] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

## Blocksconvert

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,8 @@ compactor:
# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Timeout for compactor to become ACTIVE in the ring
# CLI flag: -compactor.ring.starting-timeout
[starting_timeout: <duration> | default = 10m]
```
4 changes: 4 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ func (c *Compactor) starting(ctx context.Context) error {
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")

ctx, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.StartingTimeout)
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err)
return err
}
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")
Expand Down
5 changes: 5 additions & 0 deletions pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type RingConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

StartingTimeout time.Duration `yaml:"starting_timeout"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -59,6 +61,9 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.InstanceAddr, "compactor.ring.instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, "compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.")

// Timeout durations
f.DurationVar(&cfg.StartingTimeout, "compactor.ring.starting-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring")
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor
Expand Down
29 changes: 29 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ func prepareConfig() Config {
compactorCfg.ShardingRing.WaitStabilityMinDuration = 0
compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0

// Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests
compactorCfg.ShardingRing.StartingTimeout = 5 * time.Second

return compactorCfg
}

Expand Down Expand Up @@ -1279,3 +1282,29 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
require.NotEqual(t, numUsers, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
t.Parallel()

// Mock the bucket
bucketClient := &bucket.ClientMock{}

cfg := prepareConfig()
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = "compactor-1"
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = consul.NewBadInMemoryClient(ring.GetCodec())

c, _, _, logs, _ := prepare(t, cfg, bucketClient)

// Try to start the compactor with a bad consul kv-store. The
err := services.StartAndAwaitRunning(context.Background(), c)

// Assert that the compactor timesout
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}
36 changes: 36 additions & 0 deletions pkg/ring/kv/consul/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -228,3 +229,38 @@ func mockedMaxWaitTime(queryWaitTime time.Duration) time.Duration {

return queryWaitTime
}

type mockBadKV struct {
mockKV
}

// NewBadInMemoryClient makes a new mock consul client.
func NewBadInMemoryClient(codec codec.Codec) *Client {
test := mockBadKV{}
test.kvps = nil
return NewBadInMemoryClientWithConfig(codec, Config{})
}

// NewBadInMemoryClientWithConfig makes a new mock consul client with supplied Config
// and a failing CAS method for tests.
func NewBadInMemoryClientWithConfig(codec codec.Codec, cfg Config) *Client {
m := mockBadKV{
mockKV: mockKV{
kvps: map[string]*consul.KVPair{},
// Always start from 1, we NEVER want to report back index 0 in the responses.
// This is in line with Consul, and our new checks for index return value in client.go.
current: 1,
},
}
m.cond = sync.NewCond(&m.mtx)
go m.loop()
return &Client{
kv: &m,
codec: codec,
cfg: cfg,
}
}

func (m *mockBadKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
return false, nil, errors.New("CAS Error")
}
4 changes: 4 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetInstancePort(configPort, listenPort int) int {

// WaitInstanceState waits until the input instanceID is registered within the
// ring matching the provided state. A timeout should be provided within the context.
func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error {
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
Expand Down
115 changes: 115 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,65 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type RingMock struct {
mock.Mock
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}

func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}

func (r *RingMock) InstancesCount() int {
return 0
}

func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(InstanceState), args.Error(1)
}

func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
args := r.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ReadRing)
}

func (r *RingMock) HasInstance(instanceID string) bool {
return true
}

func (r *RingMock) CleanupShuffleShardCache(identifier string) {}

func TestGenerateTokens(t *testing.T) {
tokens := GenerateTokens(1000000, nil)

Expand Down Expand Up @@ -184,3 +239,63 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}

func TestWaitInstanceStateTimeout(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, PENDING)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateTimeoutOnError(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring"))

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Duration(3)
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Nil(t, err)
ring.AssertNumberOfCalls(t, "GetInstanceState", 1)
}