diff --git a/common/daemon.go b/common/daemon.go index d43d77fa166..c3a0156ade1 100644 --- a/common/daemon.go +++ b/common/daemon.go @@ -20,6 +20,17 @@ package common +const ( + // used for background threads + + // DaemonStatusInitialized coroutine pool initialized + DaemonStatusInitialized int32 = 0 + // DaemonStatusStarted coroutine pool started + DaemonStatusStarted int32 = 1 + // DaemonStatusStopped coroutine pool stopped + DaemonStatusStopped int32 = 2 +) + type ( // Daemon is the base interfaces implemented by // background tasks within cherami diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 68c38a8f0cc..8be240f8320 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -956,7 +956,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { requestID := "requestID" s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() - s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ Msg: "random message", StartRequestID: requestID, @@ -1005,7 +1004,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { identity := "testIdentity" s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() - s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ Msg: "random message", StartRequestID: "oldRequestID", @@ -1064,7 +1062,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { expecedErrs := []bool{true, false, true} s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) - s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) s.mockExecutionMgr.On( "CreateWorkflowExecution", mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), @@ -1152,7 +1149,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { for i, closeState := range closeStates { s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) - s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) s.mockExecutionMgr.On( "CreateWorkflowExecution", mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), diff --git a/service/history/historyEventNotifier.go b/service/history/historyEventNotifier.go index 88e0090b5e6..505737f13be 100644 --- a/service/history/historyEventNotifier.go +++ b/service/history/historyEventNotifier.go @@ -26,17 +26,13 @@ import ( "github.com/pborman/uuid" gen "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/metrics" ) const ( eventsChanSize = 1000 - - // used for workflow pubsub status - statusIdle int32 = 0 - statusStarted int32 = 1 - statusStopped int32 = 2 ) type ( @@ -93,7 +89,7 @@ func newHistoryEventNotifier(metrics metrics.Client, workflowIDToShardID func(st } return &historyEventNotifierImpl{ metrics: metrics, - status: statusIdle, + status: common.DaemonStatusInitialized, closeChan: make(chan bool), eventsChan: make(chan *historyEventNotification, eventsChanSize), @@ -213,14 +209,14 @@ func (notifier *historyEventNotifierImpl) dequeueHistoryEventNotifications() { } func (notifier *historyEventNotifierImpl) Start() { - if !atomic.CompareAndSwapInt32(¬ifier.status, statusIdle, statusStarted) { + if !atomic.CompareAndSwapInt32(¬ifier.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } go notifier.dequeueHistoryEventNotifications() } func (notifier *historyEventNotifierImpl) Stop() { - if !atomic.CompareAndSwapInt32(¬ifier.status, statusStarted, statusStopped) { + if !atomic.CompareAndSwapInt32(¬ifier.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } close(notifier.closeChan) diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 285328be3a0..5a0dd6403c8 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" ) type ( @@ -60,8 +61,7 @@ type ( workerNotificationChans []chan struct{} notifyCh chan struct{} - isStarted int32 - isStopped int32 + status int32 shutdownWG sync.WaitGroup shutdownCh chan struct{} } @@ -83,6 +83,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p processor: processor, rateLimiter: common.NewTokenBucket(options.MaxPollRPS, common.NewRealTimeSource()), workerNotificationChans: workerNotificationChans, + status: common.DaemonStatusInitialized, notifyCh: make(chan struct{}, 1), shutdownCh: make(chan struct{}), metricsClient: shard.GetMetricsClient(), @@ -94,7 +95,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p } func (p *queueProcessorBase) Start() { - if !atomic.CompareAndSwapInt32(&p.isStarted, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } @@ -107,16 +108,14 @@ func (p *queueProcessorBase) Start() { } func (p *queueProcessorBase) Stop() { - if !atomic.CompareAndSwapInt32(&p.isStopped, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } logging.LogQueueProcesorShuttingDownEvent(p.logger) defer logging.LogQueueProcesorShutdownEvent(p.logger) - if atomic.LoadInt32(&p.isStarted) == 1 { - close(p.shutdownCh) - } + close(p.shutdownCh) if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success { logging.LogQueueProcesorShutdownTimedoutEvent(p.logger) @@ -151,7 +150,8 @@ processorPumpLoop: case <-p.shutdownCh: break processorPumpLoop case <-p.ackMgr.getFinishedChan(): - p.Stop() + // use a separate gorouting since the caller hold the shutdownWG + go p.Stop() case <-p.notifyCh: p.processBatch(tasksCh) case <-pollTimer.C: @@ -167,7 +167,7 @@ processorPumpLoop: // This is the only pump which writes to tasksCh, so it is safe to close channel here close(tasksCh) if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success { - p.logger.Warn("Queue processor timed out on worker shutdown.") + p.logger.Warn("Queue processor timedout on worker shutdown.") } updateAckTimer.Stop() pollTimer.Stop() @@ -210,11 +210,12 @@ func (p *queueProcessorBase) taskWorker(tasksCh <-chan queueTaskInfo, notificati for { select { + case <-p.shutdownCh: + return case task, ok := <-tasksCh: if !ok { return } - p.processWithRetry(notificationChan, task) } } @@ -230,7 +231,13 @@ func (p *queueProcessorBase) retryTasks() { } func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task queueTaskInfo) { - p.logger.Debugf("Processing task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) + switch task.(type) { + case *persistence.TransferTaskInfo: + p.logger.Debugf("Processing transfer task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) + case *persistence.ReplicationTaskInfo: + p.logger.Debugf("Processing replication task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) + } + ProcessRetryLoop: for retryCount := 1; retryCount <= p.options.MaxRetryCount; { select { @@ -260,6 +267,12 @@ ProcessRetryLoop: } // All attempts to process transfer task failed. We won't be able to move the ackLevel so panic - logging.LogOperationPanicEvent(p.logger, - fmt.Sprintf("Retry count exceeded for taskID: %v", task.GetTaskID()), nil) + switch task.(type) { + case *persistence.TransferTaskInfo: + logging.LogOperationPanicEvent(p.logger, + fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), nil) + case *persistence.ReplicationTaskInfo: + logging.LogOperationPanicEvent(p.logger, + fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), nil) + } } diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 94969a31d0d..403216ece60 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -262,7 +262,9 @@ Create_Loop: response, err := s.executionManager.CreateWorkflowExecution(request) if err != nil { switch err.(type) { - case *shared.WorkflowExecutionAlreadyStartedError, *shared.ServiceBusyError: + case *shared.WorkflowExecutionAlreadyStartedError, + *persistence.WorkflowExecutionAlreadyStartedError, + *shared.ServiceBusyError: // No special handling required for these errors case *persistence.ShardOwnershipLostError: { diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index d910656b24b..c1e0e125463 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -22,6 +22,7 @@ package history import ( "errors" + "fmt" "math" "sync" "sync/atomic" @@ -50,8 +51,7 @@ type ( historyService *historyEngineImpl cache *historyCache executionManager persistence.ExecutionManager - isStarted int32 - isStopped int32 + status int32 shutdownWG sync.WaitGroup shutdownCh chan struct{} tasksCh chan *persistence.TimerTaskInfo @@ -88,6 +88,7 @@ func newTimerQueueProcessorBase(shard ShardContext, historyService *historyEngin historyService: historyService, cache: historyService.historyCache, executionManager: shard.GetExecutionManager(), + status: common.DaemonStatusInitialized, shutdownCh: make(chan struct{}), tasksCh: make(chan *persistence.TimerTaskInfo, 10*shard.GetConfig().TimerTaskBatchSize), config: shard.GetConfig(), @@ -103,7 +104,7 @@ func newTimerQueueProcessorBase(shard ShardContext, historyService *historyEngin } func (t *timerQueueProcessorBase) Start() { - if !atomic.CompareAndSwapInt32(&t.isStarted, 0, 1) { + if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } @@ -114,16 +115,14 @@ func (t *timerQueueProcessorBase) Start() { } func (t *timerQueueProcessorBase) Stop() { - if !atomic.CompareAndSwapInt32(&t.isStopped, 0, 1) { + if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } - if atomic.LoadInt32(&t.isStarted) == 1 { - close(t.shutdownCh) - } + close(t.shutdownCh) if success := common.AwaitWaitGroup(&t.shutdownWG, time.Minute); !success { - t.logger.Warn("Timer queue processor timed out on shutdown.") + t.logger.Warn("Timer queue processor timedout on shutdown.") } t.logger.Info("Timer queue processor stopped.") @@ -132,24 +131,17 @@ func (t *timerQueueProcessorBase) Stop() { func (t *timerQueueProcessorBase) processorPump() { defer t.shutdownWG.Done() - // Workers to process timer tasks that are expired. - var workerWG sync.WaitGroup for i := 0; i < t.config.TimerTaskWorkerCount; i++ { workerWG.Add(1) notificationChan := t.workerNotificationChans[i] - go t.processTaskWorker(&workerWG, notificationChan) + go t.taskWorker(&workerWG, notificationChan) } RetryProcessor: for { select { case <-t.shutdownCh: - t.logger.Info("Timer queue processor pump shutting down.") - close(t.tasksCh) - if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success { - t.logger.Warn("Timer queue processor timed out on worker shutdown.") - } break RetryProcessor default: err := t.internalProcessor() @@ -158,14 +150,23 @@ RetryProcessor: } } } + + t.logger.Info("Timer queue processor pump shutting down.") + // This is the only pump which writes to tasksCh, so it is safe to close channel here + close(t.tasksCh) + if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success { + t.logger.Warn("Timer queue processor timedout on worker shutdown.") + } t.logger.Info("Timer processor exiting.") } -func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, notificationChan chan struct{}) { +func (t *timerQueueProcessorBase) taskWorker(workerWG *sync.WaitGroup, notificationChan chan struct{}) { defer workerWG.Done() for { select { + case <-t.shutdownCh: + return case task, ok := <-t.tasksCh: if !ok { return @@ -176,6 +177,7 @@ func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, no } func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task *persistence.TimerTaskInfo) { + t.logger.Debugf("Processing timer task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) ProcessRetryLoop: for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount; { select { @@ -204,6 +206,9 @@ ProcessRetryLoop: return } } + // All attempts to process transfer task failed. We won't be able to move the ackLevel so panic + logging.LogOperationPanicEvent(t.logger, + fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), nil) } // NotifyNewTimers - Notify the processor about the new timer events arrival. @@ -270,28 +275,23 @@ continueProcessor: // 4. updating ack level // select { - case <-t.shutdownCh: t.logger.Debug("Timer queue processor pump shutting down.") return nil - case <-t.timerQueueAckMgr.getFinishedChan(): // timer queue ack manager indicate that all task scanned // are finished and no more tasks - t.Stop() + // use a separate gorouting since the caller hold the shutdownWG + go t.Stop() return nil - case <-timerGate.FireChan(): // Timer Fired. - case <-pollTimer.C: // forced timer scan pollTimer.Reset(t.config.TimerProcessorMaxPollInterval) - case <-updateAckChan: t.timerQueueAckMgr.updateAckLevel() continue continueProcessor - case <-t.newTimerCh: t.newTimeLock.Lock() newTime := t.newTime