Skip to content

Commit

Permalink
retry timer task for 100 times (#872)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jun 21, 2018
1 parent 8510333 commit 36c617d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
8 changes: 4 additions & 4 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{},
case *persistence.ReplicationTaskInfo:
p.logger.Debugf("Processing replication task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
}

var err error
ProcessRetryLoop:
for retryCount := 1; retryCount <= p.options.MaxRetryCount(); {
select {
Expand All @@ -259,7 +259,7 @@ ProcessRetryLoop:
default:
}

err := p.processor.process(task)
err = p.processor.process(task)
if err != nil {
if err == ErrTaskRetry {
<-notificationChan
Expand All @@ -279,9 +279,9 @@ ProcessRetryLoop:
switch task.(type) {
case *persistence.TransferTaskInfo:
logging.LogOperationPanicEvent(p.logger,
fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), nil)
fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), err)
case *persistence.ReplicationTaskInfo:
logging.LogOperationPanicEvent(p.logger,
fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), nil)
fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), err)
}
}
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 5),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100),
TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5),
TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10),
TimerProcessorUpdateShardTaskCount: dc.GetIntProperty(dynamicconfig.TimerProcessorUpdateShardTaskCount, 100),
Expand Down
5 changes: 3 additions & 2 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (t *timerQueueProcessorBase) taskWorker(workerWG *sync.WaitGroup, notificat

func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task *persistence.TimerTaskInfo) {
t.logger.Debugf("Processing timer task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
var err error
ProcessRetryLoop:
for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount(); {
select {
Expand All @@ -194,7 +195,7 @@ ProcessRetryLoop:
default:
}

err := t.timerProcessor.process(task)
err = t.timerProcessor.process(task)
if err != nil {
if err == ErrTaskRetry {
<-notificationChan
Expand All @@ -212,7 +213,7 @@ ProcessRetryLoop:
}
// All attempts to process transfer task failed. We won't be able to move the ackLevel so panic
logging.LogOperationPanicEvent(t.logger,
fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), nil)
fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), err)
}

// NotifyNewTimers - Notify the processor about the new timer events arrival.
Expand Down

0 comments on commit 36c617d

Please sign in to comment.