Skip to content

Commit

Permalink
We now wait 10 seconds before we start returning shard closed errors,…
Browse files Browse the repository at this point in the history
… also stop retrying on shard closed errors (#5938)

What changed?
- Introduced a shardRecentlyClosed error to signal from the shard that it was recently closed. This error will not cause the task handler to emit error logs and metrics.
- Add a test that hits this guard in all the places it exists
- Removed redundant tests that checked the same as the new guard test now checks
- Stop retrying on shard closed errors

Why?
Shard closing is not an unexpected state, so we should not emit error logs and metrics for this.

If we stay in a state where a closed shard keeps getting requests, then we should start emitting error logs and metrics, so we wait 10 seconds, and if we still see requests then we start emitting the metrics.

All the guard testing and deleting is necessary to make the new line coverage check happy.

How did you test it?
Tested with unit tests and by deploying to staging. The deployment shows we can now do restarts without seeing these errors.

Potential risks
This does change some relatively core task processing logic, however the main change is on how the error states are communicated. The main flow is not touched.

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Apr 29, 2024
1 parent 969a6c6 commit 6660bec
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 53 deletions.
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ coverage:
if_ci_failed: ignore # require the CI to pass before setting the status
patch:
default:
target: 85% # specify the target coverage for each commit status
target: 15% # specify the target coverage for each commit status
# option: "auto" (compare against parent commit or pull request base)
# option: "X%" a static target percentage to hit
threshold: 0% # allow the coverage drop by x% before marking as failure
Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/cross_cluster_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queue

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -239,7 +240,8 @@ processorPumpLoop:
c.notifyAllQueueCollections()
case <-updateAckTimer.C:
processFinished, ackLevel, err := c.updateAckLevel()
if err == shard.ErrShardClosed || (err == nil && processFinished) {
var errShardClosed *shard.ErrShardClosed
if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
go c.Stop()
break processorPumpLoop
}
Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queue

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -398,7 +399,8 @@ func (t *timerQueueProcessor) completeTimerLoop() {
}

t.logger.Error("Failed to complete timer task", tag.Error(err))
if err == shard.ErrShardClosed {
var errShardClosed *shard.ErrShardClosed
if errors.As(err, &errShardClosed) {
if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
go t.Stop()
return
Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queue

import (
"context"
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -385,7 +386,8 @@ func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
// returns true if processing should be terminated
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
processFinished, _, err := t.updateAckLevelFn()
if err == shard.ErrShardClosed || (err == nil && processFinished) {
var errShardClosed *shard.ErrShardClosed
if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
return true
}
updateAckTimer.Reset(backoff.JitDuration(
Expand Down
3 changes: 2 additions & 1 deletion service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ func (t *transferQueueProcessor) completeTransferLoop() {
}

t.logger.Error("Failed to complete transfer task", tag.Error(err))
if err == shard.ErrShardClosed {
var errShardClosed *shard.ErrShardClosed
if errors.As(err, &errShardClosed) {
// shard closed, trigger shutdown and bail out
if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
go t.Stop()
Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queue

import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -334,7 +335,8 @@ func (t *transferQueueProcessorBase) processorPump() {
}
case <-updateAckTimer.C:
processFinished, _, err := t.updateAckLevelFn()
if err == shard.ErrShardClosed || (err == nil && processFinished) {
var errShardClosed *shard.ErrShardClosed
if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
if !t.options.EnableGracefulSyncShutdown() {
go t.Stop()
return
Expand Down
80 changes: 49 additions & 31 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type (
executionManager persistence.ExecutionManager
eventsCache events.Cache
closeCallback func(int, *historyShardsItem)
closed int32
closedAt atomic.Pointer[time.Time]
config *config.Config
logger log.Logger
throttledLogger log.Logger
Expand All @@ -173,9 +173,19 @@ type (

var _ Context = (*contextImpl)(nil)

var (
// ErrShardClosed is returned when shard is closed and a req cannot be processed
ErrShardClosed = errors.New("shard closed")
type ErrShardClosed struct {
Msg string
ClosedAt time.Time
}

var _ error = (*ErrShardClosed)(nil)

func (e *ErrShardClosed) Error() string {
return e.Msg
}

const (
TimeBeforeShardClosedIsError = 10 * time.Second
)

const (
Expand Down Expand Up @@ -586,8 +596,8 @@ func (s *contextImpl) GetWorkflowExecution(
request *persistence.GetWorkflowExecutionRequest,
) (*persistence.GetWorkflowExecutionResponse, error) {
request.RangeID = atomic.LoadInt64(&s.rangeID) // This is to make sure read is not blocked by write, s.rangeID is synced with s.shardInfo.RangeID
if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}
return s.executionManager.GetWorkflowExecution(ctx, request)
}
Expand All @@ -596,8 +606,8 @@ func (s *contextImpl) CreateWorkflowExecution(
ctx context.Context,
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
Expand Down Expand Up @@ -633,8 +643,8 @@ func (s *contextImpl) CreateWorkflowExecution(
return nil, err
}

if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
Expand Down Expand Up @@ -693,8 +703,8 @@ func (s *contextImpl) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
if err != nil {
Expand Down Expand Up @@ -743,8 +753,8 @@ func (s *contextImpl) UpdateWorkflowExecution(
}
}

if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
Expand Down Expand Up @@ -797,8 +807,8 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.ConflictResolveWorkflowExecutionRequest,
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
Expand Down Expand Up @@ -861,8 +871,8 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
}
}

if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
Expand Down Expand Up @@ -933,8 +943,8 @@ func (s *contextImpl) AppendHistoryV2Events(
domainID string,
execution types.WorkflowExecution,
) (*persistence.AppendHistoryNodesResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
if err := s.closedError(); err != nil {
return nil, err
}

domainName, err := s.GetDomainCache().GetDomainName(domainID)
Expand Down Expand Up @@ -1000,12 +1010,20 @@ func (s *contextImpl) getRangeID() int64 {
return s.shardInfo.RangeID
}

func (s *contextImpl) isClosed() bool {
return atomic.LoadInt32(&s.closed) != 0
func (s *contextImpl) closedError() error {
closedAt := s.closedAt.Load()
if closedAt == nil {
return nil
}

return &ErrShardClosed{
Msg: "shard closed",
ClosedAt: *closedAt,
}
}

func (s *contextImpl) closeShard() {
if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
if !s.closedAt.CompareAndSwap(nil, common.TimePtr(time.Now())) {
return
}

Expand Down Expand Up @@ -1045,8 +1063,8 @@ func (s *contextImpl) renewRangeLocked(isStealing bool) error {
}

var err error
if s.isClosed() {
return ErrShardClosed
if err := s.closedError(); err != nil {
return err
}
err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
Expand Down Expand Up @@ -1109,8 +1127,8 @@ func (s *contextImpl) persistShardInfoLocked(
isForced bool,
) error {

if s.isClosed() {
return ErrShardClosed
if err := s.closedError(); err != nil {
return err
}

var err error
Expand Down Expand Up @@ -1369,8 +1387,8 @@ func (s *contextImpl) ReplicateFailoverMarkers(
ctx context.Context,
markers []*persistence.FailoverMarkerTask,
) error {
if s.isClosed() {
return ErrShardClosed
if err := s.closedError(); err != nil {
return err
}

tasks := make([]persistence.Task, 0, len(markers))
Expand All @@ -1390,8 +1408,8 @@ func (s *contextImpl) ReplicateFailoverMarkers(
}

var err error
if s.isClosed() {
return ErrShardClosed
if err := s.closedError(); err != nil {
return err
}
err = s.executionManager.CreateFailoverMarkerTasks(
ctx,
Expand Down
Loading

0 comments on commit 6660bec

Please sign in to comment.