Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for waiting on compactor to become ACTIVE in the ring. #4262

Merged
merged 17 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,8 @@ compactor:
# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
```
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5195,6 +5195,10 @@ sharding_ring:
# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
```

### `store_gateway_config`
Expand Down
6 changes: 5 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ func (c *Compactor) starting(ctx context.Context) error {
// users scanner depends on the ring (to check whether an user belongs
// to this shard or not).
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {

ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil {
level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err)
return err
}
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")
Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type RingConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

ObservePeriod time.Duration `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -59,6 +63,9 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.InstanceAddr, "compactor.ring.instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, "compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.")

// Timeout durations
f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.")
}

// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor
Expand Down Expand Up @@ -87,7 +94,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
lc.InfNames = cfg.InstanceInterfaceNames
lc.UnregisterOnShutdown = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.ObservePeriod = 0
lc.ObservePeriod = cfg.ObservePeriod
lc.JoinAfter = 0
lc.MinReadyDuration = 0
lc.FinalSleep = 0
Expand Down
33 changes: 33 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,9 @@ func prepareConfig() Config {
compactorCfg.ShardingRing.WaitStabilityMinDuration = 0
compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0

// Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests
compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second

return compactorCfg
}

Expand Down Expand Up @@ -1279,3 +1282,33 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
require.NotEqual(t, numUsers, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
t.Parallel()

// Mock the bucket
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{}, nil)

cfg := prepareConfig()
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = "compactor-1"
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec())

// Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE
cfg.ShardingRing.ObservePeriod = time.Second * 10

c, _, _, logs, _ := prepare(t, cfg, bucketClient)

// Try to start the compactor with a bad consul kv-store. The
err := services.StartAndAwaitRunning(context.Background(), c)

// Assert that the compactor timed out
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
}
4 changes: 4 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetInstancePort(configPort, listenPort int) int {

// WaitInstanceState waits until the input instanceID is registered within the
// ring matching the provided state. A timeout should be provided within the context.
func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error {
func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
Expand Down
115 changes: 115 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,65 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type RingMock struct {
mock.Mock
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}

func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
args := r.Called(key, op, bufDescs, bufHosts, bufZones)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}

func (r *RingMock) InstancesCount() int {
return 0
}

func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(InstanceState), args.Error(1)
}

func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
args := r.Called(identifier, size, lookbackPeriod, now)
return args.Get(0).(ReadRing)
}

func (r *RingMock) HasInstance(instanceID string) bool {
return true
}

func (r *RingMock) CleanupShuffleShardCache(identifier string) {}

func TestGenerateTokens(t *testing.T) {
tokens := GenerateTokens(1000000, nil)

Expand Down Expand Up @@ -184,3 +239,63 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}

func TestWaitInstanceStateTimeout(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, PENDING)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateTimeoutOnError(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring"))

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Equal(t, context.DeadlineExceeded, err)
ring.AssertCalled(t, "GetInstanceState", instanceID)
}

func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) {
t.Parallel()

const (
instanceID = "test"
timeoutDuration = time.Second
)

ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

ring := &RingMock{}
ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil)

err := WaitInstanceState(ctx, ring, instanceID, ACTIVE)

assert.Nil(t, err)
ring.AssertNumberOfCalls(t, "GetInstanceState", 1)
}