diff --git a/codecov.yml b/codecov.yml index ec0a45dae55..1999a34d627 100644 --- a/codecov.yml +++ b/codecov.yml @@ -19,7 +19,7 @@ coverage: if_ci_failed: ignore # require the CI to pass before setting the status patch: default: - target: 85% # specify the target coverage for each commit status + target: 15% # specify the target coverage for each commit status # option: "auto" (compare against parent commit or pull request base) # option: "X%" a static target percentage to hit threshold: 0% # allow the coverage drop by x% before marking as failure diff --git a/service/history/queue/cross_cluster_queue_processor_base.go b/service/history/queue/cross_cluster_queue_processor_base.go index d5a0cadd6c9..45f4c8ccb53 100644 --- a/service/history/queue/cross_cluster_queue_processor_base.go +++ b/service/history/queue/cross_cluster_queue_processor_base.go @@ -22,6 +22,7 @@ package queue import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -239,7 +240,8 @@ processorPumpLoop: c.notifyAllQueueCollections() case <-updateAckTimer.C: processFinished, ackLevel, err := c.updateAckLevel() - if err == shard.ErrShardClosed || (err == nil && processFinished) { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) || (err == nil && processFinished) { go c.Stop() break processorPumpLoop } diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 691612f5938..cf4800295c0 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -22,6 +22,7 @@ package queue import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -398,7 +399,8 @@ func (t *timerQueueProcessor) completeTimerLoop() { } t.logger.Error("Failed to complete timer task", tag.Error(err)) - if err == shard.ErrShardClosed { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) { if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { go t.Stop() return diff --git a/service/history/queue/timer_queue_processor_base.go b/service/history/queue/timer_queue_processor_base.go index 52c67081a2c..3b9b2bee28b 100644 --- a/service/history/queue/timer_queue_processor_base.go +++ b/service/history/queue/timer_queue_processor_base.go @@ -22,6 +22,7 @@ package queue import ( "context" + "errors" "fmt" "math" "math/rand" @@ -385,7 +386,8 @@ func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) { // returns true if processing should be terminated func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool { processFinished, _, err := t.updateAckLevelFn() - if err == shard.ErrShardClosed || (err == nil && processFinished) { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) || (err == nil && processFinished) { return true } updateAckTimer.Reset(backoff.JitDuration( diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 557a94ff0a4..0b124ab3af1 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -365,7 +365,8 @@ func (t *transferQueueProcessor) completeTransferLoop() { } t.logger.Error("Failed to complete transfer task", tag.Error(err)) - if err == shard.ErrShardClosed { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) { // shard closed, trigger shutdown and bail out if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { go t.Stop() diff --git a/service/history/queue/transfer_queue_processor_base.go b/service/history/queue/transfer_queue_processor_base.go index 2fbb4cc561f..6b86eec3e4f 100644 --- a/service/history/queue/transfer_queue_processor_base.go +++ b/service/history/queue/transfer_queue_processor_base.go @@ -22,6 +22,7 @@ package queue import ( "context" + "errors" "math/rand" "sync" "sync/atomic" @@ -334,7 +335,8 @@ func (t *transferQueueProcessorBase) processorPump() { } case <-updateAckTimer.C: processFinished, _, err := t.updateAckLevelFn() - if err == shard.ErrShardClosed || (err == nil && processFinished) { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) || (err == nil && processFinished) { if !t.options.EnableGracefulSyncShutdown() { go t.Stop() return diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 55b9d1108ac..cdf0fe6dd9d 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -147,7 +147,7 @@ type ( executionManager persistence.ExecutionManager eventsCache events.Cache closeCallback func(int, *historyShardsItem) - closed int32 + closedAt atomic.Pointer[time.Time] config *config.Config logger log.Logger throttledLogger log.Logger @@ -173,9 +173,19 @@ type ( var _ Context = (*contextImpl)(nil) -var ( - // ErrShardClosed is returned when shard is closed and a req cannot be processed - ErrShardClosed = errors.New("shard closed") +type ErrShardClosed struct { + Msg string + ClosedAt time.Time +} + +var _ error = (*ErrShardClosed)(nil) + +func (e *ErrShardClosed) Error() string { + return e.Msg +} + +const ( + TimeBeforeShardClosedIsError = 10 * time.Second ) const ( @@ -586,8 +596,8 @@ func (s *contextImpl) GetWorkflowExecution( request *persistence.GetWorkflowExecutionRequest, ) (*persistence.GetWorkflowExecutionResponse, error) { request.RangeID = atomic.LoadInt64(&s.rangeID) // This is to make sure read is not blocked by write, s.rangeID is synced with s.shardInfo.RangeID - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } return s.executionManager.GetWorkflowExecution(ctx, request) } @@ -596,8 +606,8 @@ func (s *contextImpl) CreateWorkflowExecution( ctx context.Context, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error) { - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } ctx, cancel, err := s.ensureMinContextTimeout(ctx) @@ -633,8 +643,8 @@ func (s *contextImpl) CreateWorkflowExecution( return nil, err } - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } currentRangeID := s.getRangeID() request.RangeID = currentRangeID @@ -693,8 +703,8 @@ func (s *contextImpl) UpdateWorkflowExecution( ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error) { - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } ctx, cancel, err := s.ensureMinContextTimeout(ctx) if err != nil { @@ -743,8 +753,8 @@ func (s *contextImpl) UpdateWorkflowExecution( } } - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } currentRangeID := s.getRangeID() request.RangeID = currentRangeID @@ -797,8 +807,8 @@ func (s *contextImpl) ConflictResolveWorkflowExecution( ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error) { - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } ctx, cancel, err := s.ensureMinContextTimeout(ctx) @@ -861,8 +871,8 @@ func (s *contextImpl) ConflictResolveWorkflowExecution( } } - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } currentRangeID := s.getRangeID() request.RangeID = currentRangeID @@ -933,8 +943,8 @@ func (s *contextImpl) AppendHistoryV2Events( domainID string, execution types.WorkflowExecution, ) (*persistence.AppendHistoryNodesResponse, error) { - if s.isClosed() { - return nil, ErrShardClosed + if err := s.closedError(); err != nil { + return nil, err } domainName, err := s.GetDomainCache().GetDomainName(domainID) @@ -1000,12 +1010,20 @@ func (s *contextImpl) getRangeID() int64 { return s.shardInfo.RangeID } -func (s *contextImpl) isClosed() bool { - return atomic.LoadInt32(&s.closed) != 0 +func (s *contextImpl) closedError() error { + closedAt := s.closedAt.Load() + if closedAt == nil { + return nil + } + + return &ErrShardClosed{ + Msg: "shard closed", + ClosedAt: *closedAt, + } } func (s *contextImpl) closeShard() { - if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) { + if !s.closedAt.CompareAndSwap(nil, common.TimePtr(time.Now())) { return } @@ -1045,8 +1063,8 @@ func (s *contextImpl) renewRangeLocked(isStealing bool) error { } var err error - if s.isClosed() { - return ErrShardClosed + if err := s.closedError(); err != nil { + return err } err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{ ShardInfo: updatedShardInfo, @@ -1109,8 +1127,8 @@ func (s *contextImpl) persistShardInfoLocked( isForced bool, ) error { - if s.isClosed() { - return ErrShardClosed + if err := s.closedError(); err != nil { + return err } var err error @@ -1369,8 +1387,8 @@ func (s *contextImpl) ReplicateFailoverMarkers( ctx context.Context, markers []*persistence.FailoverMarkerTask, ) error { - if s.isClosed() { - return ErrShardClosed + if err := s.closedError(); err != nil { + return err } tasks := make([]persistence.Task, 0, len(markers)) @@ -1390,8 +1408,8 @@ func (s *contextImpl) ReplicateFailoverMarkers( } var err error - if s.isClosed() { - return ErrShardClosed + if err := s.closedError(); err != nil { + return err } err = s.executionManager.CreateFailoverMarkerTasks( ctx, diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 855434c129f..af9be09aa91 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -234,7 +234,6 @@ func (s *contextTestSuite) TestGetAndUpdateProcessingQueueStates() { func TestGetWorkflowExecution(t *testing.T) { testCases := []struct { name string - isClosed bool request *persistence.GetWorkflowExecutionRequest mockSetup func(*mocks.ExecutionManager) expectedResult *persistence.GetWorkflowExecutionResponse @@ -280,17 +279,6 @@ func TestGetWorkflowExecution(t *testing.T) { expectedResult: nil, expectedError: errors.New("some random error"), }, - { - name: "Shard closed", - isClosed: true, - request: &persistence.GetWorkflowExecutionRequest{ - DomainID: "testDomain", - Execution: types.WorkflowExecution{WorkflowID: "testWorkflowID", RunID: "testRunID"}, - }, - mockSetup: func(mgr *mocks.ExecutionManager) {}, - expectedResult: nil, - expectedError: ErrShardClosed, - }, } for _, tc := range testCases { @@ -301,9 +289,6 @@ func TestGetWorkflowExecution(t *testing.T) { RangeID: 12, }, } - if tc.isClosed { - shardContext.closed = 1 - } tc.mockSetup(mockExecutionMgr) result, err := shardContext.GetWorkflowExecution(context.Background(), tc.request) @@ -311,3 +296,118 @@ func TestGetWorkflowExecution(t *testing.T) { assert.Equal(t, tc.expectedError, err) } } + +func TestCloseShard(t *testing.T) { + closeCallback := make(chan struct{}) + + shardContext := &contextImpl{ + shardInfo: &persistence.ShardInfo{RangeID: 12}, + closeCallback: func(i int, item *historyShardsItem) { + close(closeCallback) + }, + } + shardContext.closeShard() + + select { + case <-closeCallback: + case <-time.After(time.Second): + assert.Fail(t, "closeCallback not called") + } + + assert.WithinDuration(t, time.Now(), *shardContext.closedAt.Load(), time.Second) + assert.Equal(t, int64(-1), shardContext.shardInfo.RangeID) +} + +func TestCloseShard_AlreadyClosed(t *testing.T) { + closeTime := time.Unix(123, 456) + + shardContext := &contextImpl{ + closeCallback: func(i int, item *historyShardsItem) { + assert.Fail(t, "closeCallback should not be called") + }, + } + shardContext.closedAt.Store(&closeTime) + shardContext.closeShard() + assert.Equal(t, closeTime, *shardContext.closedAt.Load()) +} + +func TestShardClosedGuard(t *testing.T) { + shardContext := &contextImpl{ + shardInfo: &persistence.ShardInfo{}, + } + + testCases := []struct { + name string + call func() error + }{ + { + name: "GetWorkflowExecution", + call: func() error { + _, err := shardContext.GetWorkflowExecution( + context.Background(), + &persistence.GetWorkflowExecutionRequest{RangeID: 0}, + ) + return err + }, + }, + { + name: "CreateWorkflowExecution", + call: func() error { + _, err := shardContext.CreateWorkflowExecution(context.Background(), nil) + return err + }, + }, + { + name: "UpdateWorkflowExecution", + call: func() error { + _, err := shardContext.UpdateWorkflowExecution(context.Background(), nil) + return err + }, + }, + { + name: "ConflictResolveWorkflowExecution", + call: func() error { + _, err := shardContext.ConflictResolveWorkflowExecution(context.Background(), nil) + return err + }, + }, + { + name: "AppendHistoryV2Events", + call: func() error { + _, err := shardContext.AppendHistoryV2Events(context.Background(), nil, "", types.WorkflowExecution{}) + return err + }, + }, + { + name: "renewRangeLocked", + call: func() error { + return shardContext.renewRangeLocked(false) + }, + }, + { + name: "persistShardInfoLocked", + call: func() error { + return shardContext.persistShardInfoLocked(false) + }, + }, + { + name: "ReplicateFailoverMarkers", + call: func() error { + return shardContext.ReplicateFailoverMarkers(context.Background(), nil) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + closedAt := time.Unix(123, 456) + + shardContext.closedAt.Store(&closedAt) + err := tc.call() + var shardClosedErr *ErrShardClosed + assert.ErrorAs(t, err, &shardClosedErr) + assert.Equal(t, closedAt, shardClosedErr.ClosedAt) + assert.ErrorContains(t, err, "shard closed") + }) + } +} diff --git a/service/history/task/task.go b/service/history/task/task.go index ac5fb07e1d9..27edc7527cc 100644 --- a/service/history/task/task.go +++ b/service/history/task/task.go @@ -265,6 +265,12 @@ func (t *taskImpl) HandleErr(err error) (retErr error) { return err } + // If the shard were recently closed we just return an error, so we retry in a bit. + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) && time.Since(errShardClosed.ClosedAt) < shard.TimeBeforeShardClosedIsError { + return err + } + // this is a transient error if isRedispatchErr(err) { t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain) @@ -321,7 +327,8 @@ func (t *taskImpl) HandleErr(err error) (retErr error) { } func (t *taskImpl) RetryErr(err error) bool { - if err == errWorkflowBusy || isRedispatchErr(err) || err == ErrTaskPendingActive || common.IsContextTimeoutError(err) { + var errShardClosed *shard.ErrShardClosed + if errors.As(err, &errShardClosed) || err == errWorkflowBusy || isRedispatchErr(err) || err == ErrTaskPendingActive || common.IsContextTimeoutError(err) { return false } diff --git a/service/history/task/task_test.go b/service/history/task/task_test.go index 54393b19419..c099eb989de 100644 --- a/service/history/task/task_test.go +++ b/service/history/task/task_test.go @@ -192,6 +192,22 @@ func (s *taskSuite) TestHandleErr_ErrWorkflowRateLimited() { s.Equal(errWorkflowRateLimited, taskBase.HandleErr(errWorkflowRateLimited)) } +func (s *taskSuite) TestHandleErr_ErrShardRecentlyClosed() { + taskBase := s.newTestTask(func(task Info) (bool, error) { + return true, nil + }, nil) + + taskBase.submitTime = time.Now() + + shardClosedError := &shard.ErrShardClosed{ + Msg: "shard closed", + // The shard was closed within the TimeBeforeShardClosedIsError interval + ClosedAt: time.Now().Add(-shard.TimeBeforeShardClosedIsError / 2), + } + + s.Equal(shardClosedError, taskBase.HandleErr(shardClosedError)) +} + func (s *taskSuite) TestHandleErr_ErrCurrentWorkflowConditionFailed() { taskBase := s.newTestTask(func(task Info) (bool, error) { return true, nil @@ -295,6 +311,7 @@ func (s *taskSuite) TestRetryErr() { return true, nil }, nil) + s.Equal(false, taskBase.RetryErr(&shard.ErrShardClosed{})) s.Equal(false, taskBase.RetryErr(errWorkflowBusy)) s.Equal(false, taskBase.RetryErr(ErrTaskPendingActive)) s.Equal(false, taskBase.RetryErr(context.DeadlineExceeded))