Skip to content

Commit

Permalink
Add timeout for waiting on compactor to become ACTIVE in the ring. (#…
Browse files Browse the repository at this point in the history
…4262)

* add MaxRetries to WaitInstanceState

Signed-off-by: Albert <[email protected]>

* update CHANGELOG.md

Signed-off-by: Albert <[email protected]>

* Add timeout for waiting on compactor to become ACTIVE in the ring.

Signed-off-by: Albert <[email protected]>

* add MaxRetries variable back to WaitInstanceState

Signed-off-by: Albert <[email protected]>

* Fix linting issues

Signed-off-by: Albert <[email protected]>

* Remove duplicate entry from changelog

Signed-off-by: Albert <[email protected]>

* Address PR comments and set timeout to be configurable

Signed-off-by: Albert <[email protected]>

* Address PR comments and fix tests

Signed-off-by: Albert <[email protected]>

* Update unit tests

Signed-off-by: Albert <[email protected]>

* Update changelog and fix linting

Signed-off-by: Albert <[email protected]>

* Fixed CHANGELOG entry order

Signed-off-by: Marco Pracucci <[email protected]>

Co-authored-by: Albert <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
3 people authored Jul 5, 2021
1 parent ff0d1a6 commit b4af68a
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,8 @@ compactor:
# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
```
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5195,6 +5195,10 @@ sharding_ring:
# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
```

### `store_gateway_config`
Expand Down
6 changes: 5 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ func (c *Compactor) starting(ctx context.Context) error {
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {

ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err)
return err
}
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")
Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type RingConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

ObservePeriod time.Duration `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -59,6 +63,9 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.InstanceAddr, "compactor.ring.instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, "compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.")

// Timeout durations
f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor
Expand Down Expand Up @@ -87,7 +94,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.ObservePeriod = 0
lc.ObservePeriod = cfg.ObservePeriod
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0
Expand Down
33 changes: 33 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ func prepareConfig() Config {
compactorCfg.ShardingRing.WaitStabilityMinDuration = 0
compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0

// Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests
compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second

return compactorCfg
}

Expand Down Expand Up @@ -1279,3 +1282,33 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
require.NotEqual(t, numUsers, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
t.Parallel()

// Mock the bucket
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{}, nil)

cfg := prepareConfig()
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = "compactor-1"
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec())

// Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE
cfg.ShardingRing.ObservePeriod = time.Second * 10

c, _, _, logs, _ := prepare(t, cfg, bucketClient)

// Try to start the compactor with a bad consul kv-store. The
err := services.StartAndAwaitRunning(context.Background(), c)

// Assert that the compactor timed out
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}
4 changes: 4 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetInstancePort(configPort, listenPort int) int {

// WaitInstanceState waits until the input instanceID is registered within the
// ring matching the provided state. A timeout should be provided within the context.
func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error {
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
Expand Down
115 changes: 115 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,65 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type RingMock struct {
mock.Mock
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}

func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}

func (r *RingMock) InstancesCount() int {
return 0
}

func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(InstanceState), args.Error(1)
}

func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
args := r.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ReadRing)
}

func (r *RingMock) HasInstance(instanceID string) bool {
return true
}

func (r *RingMock) CleanupShuffleShardCache(identifier string) {}

func TestGenerateTokens(t *testing.T) {
tokens := GenerateTokens(1000000, nil)

Expand Down Expand Up @@ -184,3 +239,63 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}

func TestWaitInstanceStateTimeout(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, PENDING)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateTimeoutOnError(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring"))

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Nil(t, err)
ring.AssertNumberOfCalls(t, "GetInstanceState", 1)
}

0 comments on commit b4af68a

Please sign in to comment.