diff --git a/service/history/conflictResolver_test.go b/service/history/conflictResolver_test.go index dcf5b34d110..fb5bd4c5ad3 100644 --- a/service/history/conflictResolver_test.go +++ b/service/history/conflictResolver_test.go @@ -23,6 +23,7 @@ package history import ( "os" "testing" + "time" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" @@ -185,7 +186,7 @@ func (s *conflictResolverSuite) TestGetHistory() { func (s *conflictResolverSuite) TestReset() { sourceCluster := "some random source cluster" - startTime := common.NewRealTimeSource().Now() + startTime := time.Now() domainID := s.mockContext.domainID execution := s.mockContext.workflowExecution nextEventID := int64(2) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 377c672a536..b5bcd0991ce 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -356,8 +356,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow replicationTasks = append(replicationTasks, replicationTask) } } - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) - setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks) + setTaskInfo(msBuilder.GetCurrentVersion(), time.Now().UTC(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -1999,8 +1998,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context replicationTasks = append(replicationTasks, replicationTask) } } - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) - setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks) + setTaskInfo(msBuilder.GetCurrentVersion(), time.Now().UTC(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -2761,17 +2759,13 @@ func getStartRequest(domainID string, return startRequest } -func setTaskVersion(version int64, transferTasks []persistence.Task, timerTasks []persistence.Task) { +func setTaskInfo(version int64, timestamp time.Time, transferTasks []persistence.Task, timerTasks []persistence.Task) { + // set both the task version, as well as the timestamp on the transfer tasks for _, task := range transferTasks { task.SetVersion(version) + task.SetVisibilityTimestamp(timestamp) } for _, task := range timerTasks { task.SetVersion(version) } } - -func setTransferTaskTimestamp(timestamp time.Time, transferTasks []persistence.Task) { - for _, task := range transferTasks { - task.SetVisibilityTimestamp(timestamp) - } -} diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index bf042007322..78540b419bd 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -321,13 +321,13 @@ func (s *engineSuite) TestGetMutableStateLongPoll() { // long poll, new event happen before long poll timeout go asycWorkflowUpdate(time.Second * 2) - start := common.NewRealTimeSource().Now() + start := time.Now() response, err = s.mockHistoryEngine.GetMutableState(ctx, &history.GetMutableStateRequest{ DomainUUID: common.StringPtr(domainID), Execution: &execution, ExpectedNextEventId: common.Int64Ptr(4), }) - s.True(common.NewRealTimeSource().Now().After(start.Add(time.Second * 1))) + s.True(time.Now().After(start.Add(time.Second * 1))) s.Nil(err) s.Equal(int64(5), *response.NextEventId) } diff --git a/service/history/historyEventNotifier.go b/service/history/historyEventNotifier.go index 124a6fd0bfa..505737f13be 100644 --- a/service/history/historyEventNotifier.go +++ b/service/history/historyEventNotifier.go @@ -177,7 +177,7 @@ func (notifier *historyEventNotifierImpl) dispatchHistoryEventNotification(event func (notifier *historyEventNotifierImpl) enqueueHistoryEventNotification(event *historyEventNotification) { // set the timestamp just before enqueuing the event - event.timestamp = common.NewRealTimeSource().Now() + event.timestamp = time.Now() select { case notifier.eventsChan <- event: default: diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 5cc97546937..e78e1ce96c0 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -548,8 +548,12 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex } transferTasks := sBuilder.getTransferTasks() timerTasks := sBuilder.getTimerTasks() - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) - setTransferTaskTimestamp(common.NewFakeTimeSource().Update(time.Unix(0, lastEvent.GetTimestamp())).Now(), transferTasks) + setTaskInfo( + msBuilder.GetCurrentVersion(), + common.NewFakeTimeSource().Update(time.Unix(0, lastEvent.GetTimestamp())).Now(), + transferTasks, + timerTasks, + ) createWorkflow := func(isBrandNew bool, prevRunID string) error { _, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index f7017b148e4..1817263f2cb 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -25,6 +25,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" @@ -309,7 +310,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre }}, History: &shared.History{}, } - startTimeStamp := common.NewRealTimeSource().Now() + startTimeStamp := time.Now().UTC() msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{ LastWriteVersion: currentLastWriteVersion, LastWriteEventID: currentLastEventID, @@ -359,7 +360,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre }}, History: &shared.History{}, } - startTimeStamp := common.NewRealTimeSource().Now() + startTimeStamp := time.Now().UTC() msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{ LastWriteVersion: currentLastWriteVersion, LastWriteEventID: currentLastEventID, @@ -398,7 +399,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre }}, History: &shared.History{}, } - startTimeStamp := common.NewRealTimeSource().Now() + startTimeStamp := time.Now().UTC() msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{ LastWriteVersion: currentLastWriteVersion, LastWriteEventID: currentLastEventID, @@ -625,7 +626,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_BrandNew() { } sBuilder := &mockStateBuilder{} requestID := uuid.New() - now := common.NewRealTimeSource().Now() + now := time.Now().UTC() history := &shared.History{ Events: []*shared.HistoryEvent{ &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, @@ -1000,7 +1001,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In } sBuilder := &mockStateBuilder{} requestID := uuid.New() - now := common.NewRealTimeSource().Now() + now := time.Now().UTC() history := &shared.History{ Events: []*shared.HistoryEvent{ &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, @@ -1154,7 +1155,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In } sBuilder := &mockStateBuilder{} requestID := uuid.New() - now := common.NewRealTimeSource().Now() + now := time.Now().UTC() history := &shared.History{ Events: []*shared.HistoryEvent{ &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, @@ -1508,7 +1509,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc } sBuilder := &mockStateBuilder{} requestID := uuid.New() - now := common.NewRealTimeSource().Now() + now := time.Now().UTC() history := &shared.History{ Events: []*shared.HistoryEvent{ &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 3b3ba2099ec..c323bf98728 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -279,7 +279,7 @@ func (s *TestShardContext) UpdateWorkflowExecution(request *persistence.UpdateWo } task.SetTaskID(seqID) s.logger.Infof("%v: TestShardContext: Assigning timer (timestamp: %v, seq: %v)", - common.NewRealTimeSource().Now(), persistence.GetVisibilityTSFrom(task).UTC(), task.GetTaskID()) + time.Now(), persistence.GetVisibilityTSFrom(task).UTC(), task.GetTaskID()) } return s.executionMgr.UpdateWorkflowExecution(request) } @@ -351,7 +351,7 @@ func (s *TestShardContext) GetCurrentTime(cluster string) time.Time { if cluster != s.GetService().GetClusterMetadata().GetCurrentClusterName() { return s.standbyClusterCurrentTime[cluster] } - return common.NewRealTimeSource().Now() + return time.Now() } // SetupWorkflowStoreWithOptions to setup workflow test base diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 28aa0890fa1..2ff27fada78 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -827,7 +827,7 @@ func (e *mutableStateBuilder) IsStickyTaskListEnabled() bool { } func (e *mutableStateBuilder) CreateNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent { - return e.CreateNewHistoryEventWithTimestamp(eventType, common.NewRealTimeSource().Now().UnixNano()) + return e.CreateNewHistoryEventWithTimestamp(eventType, time.Now().UnixNano()) } func (e *mutableStateBuilder) CreateNewHistoryEventWithTimestamp(eventType workflow.EventType, @@ -1057,7 +1057,7 @@ func (e *mutableStateBuilder) HasParentExecution() bool { func (e *mutableStateBuilder) UpdateActivityProgress(ai *persistence.ActivityInfo, request *workflow.RecordActivityTaskHeartbeatRequest) { ai.Details = request.Details - ai.LastHeartBeatUpdatedTime = common.NewRealTimeSource().Now() + ai.LastHeartBeatUpdatedTime = time.Now() e.updateActivityInfos[ai] = struct{}{} } @@ -1459,7 +1459,7 @@ func (e *mutableStateBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64, scheduleID := di.ScheduleID startedID := scheduleID + 1 tasklist := request.TaskList.GetName() - timestamp := common.NewRealTimeSource().Now().UnixNano() + timestamp := time.Now().UnixNano() // First check to see if new events came since transient decision was scheduled if di.Attempt > 0 && di.ScheduleID != e.GetNextEventID() { // Also create a new DecisionTaskScheduledEvent since new events came in when it was scheduled @@ -1713,7 +1713,7 @@ func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.Activi // instead update mutable state and will record started event when activity task is closed ai.StartedID = common.TransientEventID ai.RequestID = requestID - ai.StartedTime = common.NewRealTimeSource().Now() + ai.StartedTime = time.Now() ai.StartedIdentity = identity e.UpdateActivity(ai) return nil @@ -2337,8 +2337,12 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour TaskList: newExecutionInfo.TaskList, ScheduleID: di.ScheduleID, }} - setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil) - setTransferTaskTimestamp(common.NewFakeTimeSource().Update(time.Unix(0, startedEvent.GetTimestamp())).Now(), newTransferTasks) + setTaskInfo( + newStateBuilder.GetCurrentVersion(), + common.NewFakeTimeSource().Update(time.Unix(0, startedEvent.GetTimestamp())).Now(), + newTransferTasks, + nil, + ) e.continueAsNew = &persistence.CreateWorkflowExecutionRequest{ // NOTE: there is no replication task for the start / decision scheduled event, diff --git a/service/history/queueAckMgr_test.go b/service/history/queueAckMgr_test.go index 4b81399e65e..009cd23fff0 100644 --- a/service/history/queueAckMgr_test.go +++ b/service/history/queueAckMgr_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/uber-common/bark" "github.com/uber-go/tally" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/messaging" @@ -124,8 +123,8 @@ func (s *queueAckMgrSuite) SetupTest() { ShardID: 0, RangeID: 1, ClusterTimerAckLevel: map[string]time.Time{ - cluster.TestCurrentClusterName: common.NewRealTimeSource().Now().Add(-8 * time.Second), - cluster.TestAlternativeClusterName: common.NewRealTimeSource().Now().Add(-10 * time.Second), + cluster.TestCurrentClusterName: time.Now().Add(-8 * time.Second), + cluster.TestAlternativeClusterName: time.Now().Add(-10 * time.Second), }, }), transferSequenceNumber: 1, @@ -333,8 +332,8 @@ func (s *queueFailoverAckMgrSuite) SetupTest() { ShardID: 0, RangeID: 1, ClusterTimerAckLevel: map[string]time.Time{ - cluster.TestCurrentClusterName: common.NewRealTimeSource().Now(), - cluster.TestAlternativeClusterName: common.NewRealTimeSource().Now().Add(-10 * time.Second), + cluster.TestCurrentClusterName: time.Now(), + cluster.TestAlternativeClusterName: time.Now().Add(-10 * time.Second), }, }), transferSequenceNumber: 1, diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index f2875f53414..a094e3745d8 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -196,7 +196,7 @@ func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) { return } - p.lastPollTime = common.NewRealTimeSource().Now() + p.lastPollTime = time.Now() tasks, more, err := p.ackMgr.readQueueTasks() if err != nil { @@ -251,7 +251,7 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{}, var logger bark.Logger var err error - startTime := common.NewRealTimeSource().Now() + startTime := time.Now() retryCount := 0 op := func() error { @@ -283,14 +283,8 @@ ProcessRetryLoop: if err != nil { if err == ErrTaskRetry { p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter) - timestamp := task.GetVisibilityTimestamp() - for { - <-notificationChan - if p.shard.GetCurrentTime(p.clusterName).After(timestamp) { - continue ProcessRetryLoop - } - } - } else if _, ok := err.(*workflow.DomainNotActiveError); ok && common.NewRealTimeSource().Now().Sub(startTime) > cache.DomainCacheRefreshInterval { + <-notificationChan + } else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter) return } diff --git a/service/history/shardContext.go b/service/history/shardContext.go index ef98570be5f..177b1f70a7c 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -619,7 +619,7 @@ func (s *shardContextImpl) allocateTimerIDsLocked(timerTasks []persistence.Task) // This is not a common scenario, the shard can move and new host might have a time SKU. // We generate a new timer ID that is above the ack level with an offset. s.logger.Warnf("%v: New timer generated is less than ack level. timestamp: %v, ackLevel: %v", - common.NewRealTimeSource().Now(), ts, s.shardInfo.TimerAckLevel) + time.Now(), ts, s.shardInfo.TimerAckLevel) newTimestamp := s.shardInfo.TimerAckLevel persistence.SetVisibilityTSFrom(task, newTimestamp.Add(time.Second)) } @@ -658,7 +658,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time { if cluster != s.GetService().GetClusterMetadata().GetCurrentClusterName() { return s.standbyClusterCurrentTime[cluster] } - return common.NewRealTimeSource().Now() + return time.Now() } // TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter. diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index 4674c74f63e..8b466dacc6e 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -175,7 +175,7 @@ func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64, timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID, baseTime) tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, TimeoutType: %v, EventID: %v", - common.NewRealTimeSource().Now(), fireTimeout, timeoutType.String(), timeOutTask.EventID) + time.Now(), fireTimeout, timeoutType.String(), timeOutTask.EventID) return timeOutTask } @@ -249,7 +249,7 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder mutableState) per msBuilder.UpdateActivity(ai) tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", - common.NewRealTimeSource().Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) + time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) } return timerTask } diff --git a/service/history/timerGate.go b/service/history/timerGate.go index f2781a0d31c..9722f69ab32 100644 --- a/service/history/timerGate.go +++ b/service/history/timerGate.go @@ -23,8 +23,6 @@ package history import ( "sync" "time" - - "github.com/uber/cadence/common" ) type ( @@ -133,7 +131,7 @@ func (timerGate *LocalTimerGateImpl) FireAfter(now time.Time) bool { // success means timer is idle or timer is set with a sooner time to fire func (timerGate *LocalTimerGateImpl) Update(nextTime time.Time) bool { // NOTE: negative duration will make the timer fire immediately - now := common.NewRealTimeSource().Now() + now := time.Now() if timerGate.timer.Stop() && timerGate.nextWakeupTime.Before(nextTime) { // this means the timer, before stopped, is active && next wake up time do not have to be updated diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 145f684a654..53068feb5d9 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -359,7 +359,7 @@ Update_History_Loop: createNewTimer = true t.logger.Debugf("Ignore ActivityTimeout (%v) as retry is needed. New attempt: %v, retry backoff duration: %v.", - timeoutType, ai.Attempt, retryTask.(*persistence.RetryTimerTask).VisibilityTimestamp.Sub(common.NewRealTimeSource().Now())) + timeoutType, ai.Attempt, retryTask.(*persistence.RetryTimerTask).VisibilityTimestamp.Sub(time.Now())) continue } @@ -421,7 +421,7 @@ Update_History_Loop: createNewTimer = true t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", - common.NewRealTimeSource().Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) + time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) } // Done! diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 6551fdea5ce..8bfdaca4e1b 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -313,7 +313,7 @@ func (t *timerQueueProcessorBase) internalProcessor() error { t.config.TimerProcessorMaxPollInterval(), t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), )) - if t.lastPollTime.Add(t.config.TimerProcessorMaxPollInterval()).Before(common.NewRealTimeSource().Now()) { + if t.lastPollTime.Add(t.config.TimerProcessorMaxPollInterval()).Before(time.Now()) { lookAheadTimer, err := t.readAndFanoutTimerTasks() if err != nil { return err @@ -342,7 +342,7 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT return nil, nil } - t.lastPollTime = common.NewRealTimeSource().Now() + t.lastPollTime = time.Now() timerTasks, lookAheadTask, moreTasks, err := t.timerQueueAckMgr.readTimerTasks() if err != nil { return nil, err @@ -374,7 +374,7 @@ func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struc var logger bark.Logger var err error - startTime := common.NewRealTimeSource().Now() + startTime := time.Now() attempt := 0 op := func() error { @@ -406,9 +406,8 @@ ProcessRetryLoop: if err != nil { if err == ErrTaskRetry { t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskStandbyRetryCounter) - // the timer has already been delayed, so just wait on the notificationChan <-notificationChan - } else if _, ok := err.(*workflow.DomainNotActiveError); ok && common.NewRealTimeSource().Now().Sub(startTime) > cache.DomainCacheRefreshInterval { + } else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskNotActiveCounter) return } diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index f41f2ac5da9..fb8d1e50534 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -187,7 +187,7 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi c.msBuilder.GetExecutionInfo().LastFirstEventID, c.msBuilder.GetExecutionInfo().NextEventID) } - now := common.NewRealTimeSource().Now() + now := time.Now() return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID, now) } @@ -267,8 +267,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe replicationTasks = append(replicationTasks, c.msBuilder.CreateReplicationTask()) } - setTaskVersion(c.msBuilder.GetCurrentVersion(), transferTasks, timerTasks) - setTransferTaskTimestamp(now, transferTasks) + setTaskInfo(c.msBuilder.GetCurrentVersion(), now, transferTasks, timerTasks) if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{ ExecutionInfo: executionInfo, @@ -310,7 +309,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe // Update went through so update the condition for new updates c.updateCondition = c.msBuilder.GetNextEventID() - c.msBuilder.GetExecutionInfo().LastUpdatedTimestamp = common.NewRealTimeSource().Now() + c.msBuilder.GetExecutionInfo().LastUpdatedTimestamp = time.Now() // for any change in the workflow, send a event c.shard.NotifyNewHistoryEvent(newHistoryEventNotification(