diff --git a/service/history/nDCStandbyTaskUtil.go b/service/history/nDCStandbyTaskUtil.go index 89999a22409..8b140451e95 100644 --- a/service/history/nDCStandbyTaskUtil.go +++ b/service/history/nDCStandbyTaskUtil.go @@ -110,7 +110,7 @@ type ( workflowTaskPostActionInfo struct { *historyResendInfo - workflowTaskScheduleToStartTimeout int64 + workflowTaskScheduleToStartTimeout *time.Duration taskqueue taskqueuepb.TaskQueue } @@ -171,7 +171,7 @@ func newActivityRetryTimePostActionInfo( func newWorkflowTaskPostActionInfo( mutableState workflow.MutableState, - workflowTaskScheduleToStartTimeout int64, + workflowTaskScheduleToStartTimeout *time.Duration, taskqueue taskqueuepb.TaskQueue, ) (*workflowTaskPostActionInfo, error) { resendInfo, err := getHistoryResendInfo(mutableState) diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 8d3881629b8..9f886ae7a3f 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -243,7 +243,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( // the correct logic should check whether the workflow task is a sticky workflowTask // task or not. var taskQueue *taskqueuepb.TaskQueue - taskScheduleToStartTimeoutSeconds := int64(0) + var taskScheduleToStartTimeout *time.Duration if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue { // this workflowTask is an sticky workflowTask // there shall already be an timer set @@ -251,14 +251,14 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( Name: task.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_STICKY, } - taskScheduleToStartTimeoutSeconds = int64(timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout).Seconds()) + taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout } else { taskQueue = &taskqueuepb.TaskQueue{ Name: task.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } - workflowRunTimeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout) - taskScheduleToStartTimeoutSeconds = int64(workflowRunTimeout.Round(time.Second).Seconds()) + workflowRunTimeout := executionInfo.WorkflowRunTimeout + taskScheduleToStartTimeout = workflowRunTimeout } originalTaskQueue := mutableState.GetExecutionInfo().TaskQueue @@ -267,7 +267,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( // the rest of logic is making RPC call, which takes time. release(nil) - err = t.pushWorkflowTask(ctx, task, taskQueue, timestamp.DurationFromSeconds(taskScheduleToStartTimeoutSeconds)) + err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout) if _, ok := err.(*serviceerrors.StickyWorkerUnavailable); ok { // sticky worker is unavailable, switch to original task queue @@ -282,7 +282,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( // There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset // the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task // will re-create a new non-sticky task and reset sticky. - err = t.pushWorkflowTask(ctx, task, taskQueue, timestamp.DurationFromSeconds(taskScheduleToStartTimeoutSeconds)) + err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout) } return err } diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 83c362b441c..2f0ccb7fc01 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -2638,10 +2638,10 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest( Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } executionInfo := mutableState.GetExecutionInfo() - timeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout) + timeout := executionInfo.WorkflowRunTimeout if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue { taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY - timeout = timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout) + timeout = executionInfo.StickyScheduleToStartTimeout } return &matchingservice.AddWorkflowTaskRequest{ @@ -2652,7 +2652,7 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest( }, TaskQueue: taskQueue, ScheduledEventId: task.ScheduledEventID, - ScheduleToStartTimeout: &timeout, + ScheduleToStartTimeout: timeout, Clock: vclock.NewVectorClock(s.mockClusterMetadata.GetClusterID(), s.mockShard.GetShardID(), task.TaskID), } } diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index c496d820e68..6b8c55c22b7 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -175,7 +175,6 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( ctx context.Context, transferTask *tasks.WorkflowTask, ) error { - processTaskIfClosed := false actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { wtInfo := mutableState.GetWorkflowTaskByID(transferTask.ScheduledEventID) if wtInfo == nil { @@ -189,13 +188,13 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( Name: mutableState.GetExecutionInfo().TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } - workflowRunTimeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout) - taskScheduleToStartTimeoutSeconds := int64(workflowRunTimeout.Round(time.Second).Seconds()) + workflowRunTimeout := executionInfo.WorkflowRunTimeout + taskScheduleToStartTimeout := workflowRunTimeout if mutableState.GetExecutionInfo().TaskQueue != transferTask.TaskQueue { // Experimental: try to push sticky task as regular task with sticky timeout as TTL. // workflow might be sticky before namespace become standby // there shall already be a schedule_to_start timer created - taskScheduleToStartTimeoutSeconds = int64(timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout).Seconds()) + taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout } err := CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), wtInfo.Version, transferTask.Version, transferTask) if err != nil { @@ -205,7 +204,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( if wtInfo.StartedEventID == common.EmptyEventID { return newWorkflowTaskPostActionInfo( mutableState, - taskScheduleToStartTimeoutSeconds, + taskScheduleToStartTimeout, *taskQueue, ) } @@ -215,7 +214,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( return t.processTransfer( ctx, - processTaskIfClosed, + false, transferTask, actionFn, getStandbyPostActionFn( @@ -575,7 +574,7 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask( ctx, task.(*tasks.WorkflowTask), &pushwtInfo.taskqueue, - timestamp.DurationFromSeconds(timeout), + timeout, ) }