diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 51629a39612..2c898dfac5b 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -246,7 +246,7 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{}, case *persistence.ReplicationTaskInfo: p.logger.Debugf("Processing replication task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) } - + var err error ProcessRetryLoop: for retryCount := 1; retryCount <= p.options.MaxRetryCount(); { select { @@ -259,7 +259,7 @@ ProcessRetryLoop: default: } - err := p.processor.process(task) + err = p.processor.process(task) if err != nil { if err == ErrTaskRetry { <-notificationChan @@ -279,9 +279,9 @@ ProcessRetryLoop: switch task.(type) { case *persistence.TransferTaskInfo: logging.LogOperationPanicEvent(p.logger, - fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), nil) + fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), err) case *persistence.ReplicationTaskInfo: logging.LogOperationPanicEvent(p.logger, - fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), nil) + fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), err) } } diff --git a/service/history/service.go b/service/history/service.go index 39379f1a9ce..87f339b9aef 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -106,7 +106,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), - TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 5), + TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100), TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5), TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10), TimerProcessorUpdateShardTaskCount: dc.GetIntProperty(dynamicconfig.TimerProcessorUpdateShardTaskCount, 100), diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index ceee3048948..a4df5036036 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -182,6 +182,7 @@ func (t *timerQueueProcessorBase) taskWorker(workerWG *sync.WaitGroup, notificat func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task *persistence.TimerTaskInfo) { t.logger.Debugf("Processing timer task: %v, type: %v", task.GetTaskID(), task.GetTaskType()) + var err error ProcessRetryLoop: for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount(); { select { @@ -194,7 +195,7 @@ ProcessRetryLoop: default: } - err := t.timerProcessor.process(task) + err = t.timerProcessor.process(task) if err != nil { if err == ErrTaskRetry { <-notificationChan @@ -212,7 +213,7 @@ ProcessRetryLoop: } // All attempts to process transfer task failed. We won't be able to move the ackLevel so panic logging.LogOperationPanicEvent(t.logger, - fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), nil) + fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), err) } // NotifyNewTimers - Notify the processor about the new timer events arrival.