diff --git a/CHANGELOG.md b/CHANGELOG.md index 00ff7a81c2..2f825f9bf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 +* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 213405822c..280545afab 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -230,4 +230,8 @@ compactor: # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] + + # Timeout for waiting on compactor to become ACTIVE in the ring. + # CLI flag: -compactor.ring.wait-active-instance-timeout + [wait_active_instance_timeout: | default = 10m] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 9b20d81bea..0267d01e88 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5195,6 +5195,10 @@ sharding_ring: # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] + + # Timeout for waiting on compactor to become ACTIVE in the ring. + # CLI flag: -compactor.ring.wait-active-instance-timeout + [wait_active_instance_timeout: | default = 10m] ``` ### `store_gateway_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index c41f7a9ccc..0af2088bd5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -393,7 +393,11 @@ func (c *Compactor) starting(ctx context.Context) error { // users scanner depends on the ring (to check whether an user belongs // to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { + + ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) + defer cancel() + if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { + level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err) return err } level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 74c3a77ad3..5b55c8a871 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -34,6 +34,10 @@ type RingConfig struct { // Injected internally ListenPort int `yaml:"-"` + + WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` + + ObservePeriod time.Duration `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -59,6 +63,9 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceAddr, "compactor.ring.instance-addr", "", "IP address to advertise in the ring.") f.IntVar(&cfg.InstancePort, "compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.") + + // Timeout durations + f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.") } // ToLifecyclerConfig returns a LifecyclerConfig based on the compactor @@ -87,7 +94,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.InfNames = cfg.InstanceInterfaceNames lc.UnregisterOnShutdown = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod - lc.ObservePeriod = 0 + lc.ObservePeriod = cfg.ObservePeriod lc.JoinAfter = 0 lc.MinReadyDuration = 0 lc.FinalSleep = 0 diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 9c14cb0924..da1a83cead 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1092,6 +1092,9 @@ func prepareConfig() Config { compactorCfg.ShardingRing.WaitStabilityMinDuration = 0 compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0 + // Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests + compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second + return compactorCfg } @@ -1279,3 +1282,33 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { require.NotEqual(t, numUsers, c1Users) require.Equal(t, numUsers, c1Users+c2Users) } + +func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { + t.Parallel() + + // Mock the bucket + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{}, nil) + + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = "compactor-1" + cfg.ShardingRing.InstanceAddr = "1.2.3.4" + cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec()) + + // Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE + cfg.ShardingRing.ObservePeriod = time.Second * 10 + + c, _, _, logs, _ := prepare(t, cfg, bucketClient) + + // Try to start the compactor with a bad consul kv-store. The + err := services.StartAndAwaitRunning(context.Background(), c) + + // Assert that the compactor timed out + assert.Equal(t, context.DeadlineExceeded, err) + + assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, + `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 539ec65e75..8d5bd0c925 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -71,6 +71,10 @@ type ReadRing interface { // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing + // GetInstanceState returns the current state of an instance or an error if the + // instance does not exist in the ring. + GetInstanceState(instanceID string) (InstanceState, error) + // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing diff --git a/pkg/ring/util.go b/pkg/ring/util.go index ac5c27388c..06e053dd26 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -69,7 +69,7 @@ func GetInstancePort(configPort, listenPort int) int { // WaitInstanceState waits until the input instanceID is registered within the // ring matching the provided state. A timeout should be provided within the context. -func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error { +func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error { backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index c3a3e037e9..9a4a69c690 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -6,10 +6,65 @@ import ( "testing" "time" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +type RingMock struct { + mock.Mock +} + +func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} + +func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {} + +func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { + args := r.Called(key, op, bufDescs, bufHosts, bufZones) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) ReplicationFactor() int { + return 0 +} + +func (r *RingMock) InstancesCount() int { + return 0 +} + +func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ReadRing) +} + +func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) { + args := r.Called(instanceID) + return args.Get(0).(InstanceState), args.Error(1) +} + +func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { + args := r.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ReadRing) +} + +func (r *RingMock) HasInstance(instanceID string) bool { + return true +} + +func (r *RingMock) CleanupShuffleShardCache(identifier string) {} + func TestGenerateTokens(t *testing.T) { tokens := GenerateTokens(1000000, nil) @@ -184,3 +239,63 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second)) } + +func TestWaitInstanceStateTimeout(t *testing.T) { + t.Parallel() + + const ( + instanceID = "test" + timeoutDuration = time.Second + ) + + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) + + err := WaitInstanceState(ctx, ring, instanceID, PENDING) + + assert.Equal(t, context.DeadlineExceeded, err) + ring.AssertCalled(t, "GetInstanceState", instanceID) +} + +func TestWaitInstanceStateTimeoutOnError(t *testing.T) { + t.Parallel() + + const ( + instanceID = "test" + timeoutDuration = time.Second + ) + + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring")) + + err := WaitInstanceState(ctx, ring, instanceID, ACTIVE) + + assert.Equal(t, context.DeadlineExceeded, err) + ring.AssertCalled(t, "GetInstanceState", instanceID) +} + +func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { + t.Parallel() + + const ( + instanceID = "test" + timeoutDuration = time.Second + ) + + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) + + err := WaitInstanceState(ctx, ring, instanceID, ACTIVE) + + assert.Nil(t, err) + ring.AssertNumberOfCalls(t, "GetInstanceState", 1) +}