From 3a1b707f39d24e122c43b00fcfc2b7c2502323fb Mon Sep 17 00:00:00 2001 From: Yuri Date: Mon, 18 Nov 2024 14:10:32 -0800 Subject: [PATCH] Refactoring: rename UpdateActivityWithCallback method on mutable state to UpdateActivity (#6832) ## What changed? 1. I change every UpdateActivity to UpdateActivityWithCallback everywhere in the code. 2. I removed old UpdateActivity 3. I rename UpdateActivityWithCallback to UpdateActivity 4. Fix tests, etc. ## Why? Old UpdateActivity was not actually working. ActivityInfo in pending activities is a pointer, so calculating size was wrong. Few other reasons. ## How did you test it? unit tests ## Risks With this change the size of mutable state is calculated properly. This means that in some cases it may change, thus becoming larger then before, and trigger some errors. ## Documentation ## Is hotfix candidate? No --------- Co-authored-by: Alex Shtin --- service/history/api/pauseactivity/api_test.go | 2 +- .../history/api/updateactivityoptions/api.go | 36 +++++----- .../api/updateactivityoptions/api_test.go | 2 +- service/history/ndc/workflow_resetter.go | 10 +-- service/history/ndc/workflow_resetter_test.go | 8 +-- .../timer_queue_active_task_executor.go | 8 ++- .../timer_queue_standby_task_executor.go | 9 ++- service/history/worker_versioning_util.go | 5 +- service/history/workflow/activity.go | 2 +- service/history/workflow/mutable_state.go | 7 +- .../history/workflow/mutable_state_impl.go | 72 ++++++++----------- .../history/workflow/mutable_state_mock.go | 36 +++------- service/history/workflow/task_refresher.go | 7 +- service/history/workflow/timer_sequence.go | 15 ++-- .../history/workflow/timer_sequence_test.go | 8 +-- 15 files changed, 99 insertions(+), 128 deletions(-) diff --git a/service/history/api/pauseactivity/api_test.go b/service/history/api/pauseactivity/api_test.go index 26491192b01..a23b624b2b6 100644 --- a/service/history/api/pauseactivity/api_test.go +++ b/service/history/api/pauseactivity/api_test.go @@ -132,7 +132,7 @@ func (s *pauseActivitySuite) TestPauseActivityAcceptance() { s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true) - s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any()) + s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()) err := workflow.PauseActivityById(s.mockMutableState, activityId) s.NoError(err) diff --git a/service/history/api/updateactivityoptions/api.go b/service/history/api/updateactivityoptions/api.go index d66c4a0e5a2..bb38c1cd91a 100644 --- a/service/history/api/updateactivityoptions/api.go +++ b/service/history/api/updateactivityoptions/api.go @@ -32,6 +32,7 @@ import ( "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/historyservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/util" @@ -141,23 +142,24 @@ func updateActivityOptions( return nil, err } - // update activity info with new options - ai.TaskQueue = adjustedOptions.TaskQueue.Name - ai.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout - ai.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout - ai.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout - ai.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout - ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval - ai.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient - ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval - ai.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts - - // move forward activity version - ai.Stamp++ - - // invalidate timers - ai.TimerTaskStatus = workflow.TimerTaskStatusNone - if err := mutableState.UpdateActivity(ai); err != nil { + if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) { + // update activity info with new options + activityInfo.TaskQueue = adjustedOptions.TaskQueue.Name + activityInfo.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout + activityInfo.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout + activityInfo.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout + activityInfo.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout + activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval + activityInfo.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient + activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval + activityInfo.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts + + // move forward activity version + activityInfo.Stamp++ + + // invalidate timers + activityInfo.TimerTaskStatus = workflow.TimerTaskStatusNone + }); err != nil { return nil, err } diff --git a/service/history/api/updateactivityoptions/api_test.go b/service/history/api/updateactivityoptions/api_test.go index ae9874ec197..9068255676d 100644 --- a/service/history/api/updateactivityoptions/api_test.go +++ b/service/history/api/updateactivityoptions/api_test.go @@ -395,7 +395,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() { s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(fullActivityInfo, true) s.mockMutableState.EXPECT().RegenerateActivityRetryTask(gomock.Any(), gomock.Any()).Return(nil) - s.mockMutableState.EXPECT().UpdateActivity(gomock.Any()).Return(nil) + s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil) request := &historyservice.UpdateActivityOptionsRequest{ UpdateRequest: &workflowservicepb.UpdateActivityOptionsByIdRequest{ diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index e2d38bbef5c..80dd93dff69 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -36,6 +36,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/collection" @@ -550,10 +551,11 @@ func (r *workflowResetterImpl) failInflightActivity( switch ai.StartedEventId { case common.EmptyEventID: // activity not started, noop - // override the scheduled activity time to now - ai.ScheduledTime = timestamppb.New(now) - ai.FirstScheduledTime = timestamppb.New(now) - if err := mutableState.UpdateActivity(ai); err != nil { + if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) { + // override the scheduled activity time to now + activityInfo.ScheduledTime = timestamppb.New(now) + activityInfo.FirstScheduledTime = timestamppb.New(now) + }); err != nil { return err } diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index f6a7883ceb0..a5241749aaf 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -493,13 +493,7 @@ func (s *workflowResetterSuite) TestFailInflightActivity() { activity1.LastWorkerVersionStamp, ).Return(&historypb.HistoryEvent{}, nil) - mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{ - Version: activity2.Version, - ScheduledEventId: activity2.ScheduledEventId, - ScheduledTime: timestamppb.New(now), - FirstScheduledTime: timestamppb.New(now), - StartedEventId: activity2.StartedEventId, - }).Return(nil) + mutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil) err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason) s.NoError(err) diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index 9554a798728..618a1f0a55e 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -241,10 +241,12 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( // for updating workflow execution. In that case, only one new heartbeat timeout task should be // created. isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT - activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID) + ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID) if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) { - activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat - if err := mutableState.UpdateActivity(activityInfo); err != nil { + err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) { + activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat + }) + if err != nil { return err } updateMutableState = true diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 91a6ff317d7..88abb5f7693 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -36,6 +36,7 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -243,10 +244,12 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // for updating workflow execution. In that case, only one new heartbeat timeout task should be // created. isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT - activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID) + ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID) if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) { - activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat - if err := mutableState.UpdateActivity(activityInfo); err != nil { + err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) { + activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat + }) + if err != nil { return nil, err } updateMutableState = true diff --git a/service/history/worker_versioning_util.go b/service/history/worker_versioning_util.go index 12223401295..5f0be7d5c94 100644 --- a/service/history/worker_versioning_util.go +++ b/service/history/worker_versioning_util.go @@ -119,8 +119,9 @@ func updateIndependentActivityBuildId( return err } - ai.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId} - err = mutableState.UpdateActivity(ai) + err = mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) { + activityInfo.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId} + }) if err != nil { return err } diff --git a/service/history/workflow/activity.go b/service/history/workflow/activity.go index 2e4f37907c6..759ee450278 100644 --- a/service/history/workflow/activity.go +++ b/service/history/workflow/activity.go @@ -210,7 +210,7 @@ func PauseActivityById(mutableState MutableState, activityId string) error { return nil } - return mutableState.UpdateActivityWithCallback(ai.ActivityId, func(activityInfo *persistence.ActivityInfo, _ MutableState) { + return mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistence.ActivityInfo, _ MutableState) { // note - we are not increasing the stamp of the activity if it is running. // this is because if activity is actually running we should let it finish if GetActivityState(activityInfo) == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED { diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 008d2033e2a..a36fe2ba53a 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -145,7 +145,7 @@ type ( MaxSearchAttributeValueSize int } - UpdateActivityCallback func(*persistencespb.ActivityInfo, MutableState) + ActivityUpdater func(*persistencespb.ActivityInfo, MutableState) MutableState interface { callbacks.CanGetNexusCompletion @@ -360,9 +360,8 @@ type ( baseRunLowestCommonAncestorEventID int64, baseRunLowestCommonAncestorEventVersion int64, ) - UpdateActivity(*persistencespb.ActivityInfo) error - UpdateActivityWithTimerHeartbeat(*persistencespb.ActivityInfo, time.Time) error - UpdateActivityWithCallback(string, UpdateActivityCallback) error + UpdateActivity(int64, ActivityUpdater) error + UpdateActivityTimerHeartbeat(int64, time.Time) UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest) UpdateUserTimer(*persistencespb.TimerInfo) error UpdateCurrentVersion(version int64, forceUpdate bool) error diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 8df4c4b27ab..854b1a196b2 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -1659,37 +1659,9 @@ func (ms *MutableStateImpl) UpdateActivityInfo( return err } -// UpdateActivity updates an activity -func (ms *MutableStateImpl) UpdateActivity( - ai *persistencespb.ActivityInfo, -) error { - prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId] - if !ok { - ms.logError( - fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityId), - tag.ErrorTypeInvalidMutableStateAction, - ) - return ErrMissingActivityInfo - } - - ms.pendingActivityInfoIDs[ai.ScheduledEventId] = ai - ms.updateActivityInfos[ai.ScheduledEventId] = ai - ms.approximateSize += ai.Size() - prev.Size() - return nil -} - // UpdateActivityWithTimerHeartbeat updates an activity -func (ms *MutableStateImpl) UpdateActivityWithTimerHeartbeat( - ai *persistencespb.ActivityInfo, - timerTimeoutVisibility time.Time, -) error { - err := ms.UpdateActivity(ai) - if err != nil { - return err - } - - ms.pendingActivityTimerHeartbeats[ai.ScheduledEventId] = timerTimeoutVisibility - return nil +func (ms *MutableStateImpl) UpdateActivityTimerHeartbeat(scheduledEventId int64, timerTimeoutVisibility time.Time) { + ms.pendingActivityTimerHeartbeats[scheduledEventId] = timerTimeoutVisibility } // DeleteActivity deletes details about an activity. @@ -3045,14 +3017,16 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent( return nil, err } - // we might need to retry, so do not append started event just yet, - // instead update mutable state and will record started event when activity task is closed - ai.Version = ms.GetCurrentVersion() - ai.StartedEventId = common.TransientEventID - ai.RequestId = requestID - ai.StartedTime = timestamppb.New(ms.timeSource.Now()) - ai.StartedIdentity = identity - if err := ms.UpdateActivity(ai); err != nil { + if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ MutableState) { + // we might need to retry, so do not append started event just yet, + // instead update mutable state and will record started event when activity task is closed + activityInfo.Version = ms.GetCurrentVersion() + activityInfo.StartedEventId = common.TransientEventID + activityInfo.RequestId = requestID + activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now()) + activityInfo.StartedIdentity = identity + + }); err != nil { return nil, err } ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{} @@ -4927,7 +4901,7 @@ func (ms *MutableStateImpl) RetryActivity( } func (ms *MutableStateImpl) RecordLastActivityCompleteTime(ai *persistencespb.ActivityInfo) { - _ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, _ MutableState) { + _ = ms.UpdateActivity(ai.ScheduledEventId, func(info *persistencespb.ActivityInfo, _ MutableState) { ai.LastAttemptCompleteTime = timestamppb.New(ms.shard.GetTimeSource().Now().UTC()) }) } @@ -4952,11 +4926,11 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries( nextAttempt int32, activityFailure *failurepb.Failure, ) { - _ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, mutableState MutableState) { + _ = ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, mutableState MutableState) { mutableStateImpl, ok := mutableState.(*MutableStateImpl) if ok { ai = UpdateActivityInfoForRetries( - ai, + activityInfo, mutableStateImpl.GetCurrentVersion(), nextAttempt, mutableStateImpl.truncateRetryableActivityFailure(activityFailure), @@ -4966,8 +4940,18 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries( }) } -func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, updateCallback UpdateActivityCallback) error { - ai, activityFound := ms.GetActivityByActivityID(activityId) +/* +UpdateActivity function updates the existing pending activity via the updater callback. +To update an activity, we need to do the following steps: + * preserve the original size of the activity + * preserve the original state of the activity + * call the updater callback to update the activity + * calculate new size of the activity + * respond to the changes of the activity state (like changes to pause) +*/ + +func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater ActivityUpdater) error { + ai, activityFound := ms.GetActivityInfo(scheduledEventId) if !activityFound { return consts.ErrActivityNotFound } @@ -4979,7 +4963,7 @@ func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, update prevPause = prev.Paused } - updateCallback(ai, ms) + updater(ai, ms) if prevPause != ai.Paused { err := ms.updatePauseInfoSearchAttribute() diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index d118da2a656..f83f98c8239 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -3068,17 +3068,17 @@ func (mr *MockMutableStateMockRecorder) TaskQueueScheduleToStartTimeout(name any } // UpdateActivity mocks base method. -func (m *MockMutableState) UpdateActivity(arg0 *persistence.ActivityInfo) error { +func (m *MockMutableState) UpdateActivity(arg0 int64, arg1 ActivityUpdater) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateActivity", arg0) + ret := m.ctrl.Call(m, "UpdateActivity", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // UpdateActivity indicates an expected call of UpdateActivity. -func (mr *MockMutableStateMockRecorder) UpdateActivity(arg0 any) *gomock.Call { +func (mr *MockMutableStateMockRecorder) UpdateActivity(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivity", reflect.TypeOf((*MockMutableState)(nil).UpdateActivity), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivity", reflect.TypeOf((*MockMutableState)(nil).UpdateActivity), arg0, arg1) } // UpdateActivityInfo mocks base method. @@ -3107,32 +3107,16 @@ func (mr *MockMutableStateMockRecorder) UpdateActivityProgress(ai, request any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivityProgress", reflect.TypeOf((*MockMutableState)(nil).UpdateActivityProgress), ai, request) } -// UpdateActivityWithCallback mocks base method. -func (m *MockMutableState) UpdateActivityWithCallback(arg0 string, arg1 UpdateActivityCallback) error { +// UpdateActivityTimerHeartbeat mocks base method. +func (m *MockMutableState) UpdateActivityTimerHeartbeat(arg0 int64, arg1 time.Time) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateActivityWithCallback", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// UpdateActivityWithCallback indicates an expected call of UpdateActivityWithCallback. -func (mr *MockMutableStateMockRecorder) UpdateActivityWithCallback(arg0, arg1 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivityWithCallback", reflect.TypeOf((*MockMutableState)(nil).UpdateActivityWithCallback), arg0, arg1) -} - -// UpdateActivityWithTimerHeartbeat mocks base method. -func (m *MockMutableState) UpdateActivityWithTimerHeartbeat(arg0 *persistence.ActivityInfo, arg1 time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateActivityWithTimerHeartbeat", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateActivityTimerHeartbeat", arg0, arg1) } -// UpdateActivityWithTimerHeartbeat indicates an expected call of UpdateActivityWithTimerHeartbeat. -func (mr *MockMutableStateMockRecorder) UpdateActivityWithTimerHeartbeat(arg0, arg1 any) *gomock.Call { +// UpdateActivityTimerHeartbeat indicates an expected call of UpdateActivityTimerHeartbeat. +func (mr *MockMutableStateMockRecorder) UpdateActivityTimerHeartbeat(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivityWithTimerHeartbeat", reflect.TypeOf((*MockMutableState)(nil).UpdateActivityWithTimerHeartbeat), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivityTimerHeartbeat", reflect.TypeOf((*MockMutableState)(nil).UpdateActivityTimerHeartbeat), arg0, arg1) } // UpdateBuildIdAssignment mocks base method. diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 2ad3d736d01..27d0fb01b8a 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -389,12 +389,13 @@ func (r *TaskRefresherImpl) refreshTasksForActivity( } if CompareVersionedTransition(minVersionedTransition, EmptyVersionedTransition) == 0 { // Full refresh - // clear activity timer task mask for later activity timer task re-generation - activityInfo.TimerTaskStatus = TimerTaskStatusNone // need to update activity timer task mask for which task is generated if err := mutableState.UpdateActivity( - activityInfo, + activityInfo.ScheduledEventId, func(ai *persistencespb.ActivityInfo, _ MutableState) { + // clear activity timer task mask for later activity timer task re-generation + activityInfo.TimerTaskStatus = TimerTaskStatusNone + }, ); err != nil { return err } diff --git a/service/history/workflow/timer_sequence.go b/service/history/workflow/timer_sequence.go index 15162eb7cb5..96b3ed97dc4 100644 --- a/service/history/workflow/timer_sequence.go +++ b/service/history/workflow/timer_sequence.go @@ -153,15 +153,14 @@ func (t *timerSequenceImpl) CreateNextActivityTimer() (bool, error) { if !ok { return false, serviceerror.NewInternal(fmt.Sprintf("unable to load activity info %v", firstTimerTask.EventID)) } - // mark timer task mask as indication that timer task is generated - activityInfo.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.TimerType) - var err error - if firstTimerTask.TimerType == enumspb.TIMEOUT_TYPE_HEARTBEAT { - err = t.mutableState.UpdateActivityWithTimerHeartbeat(activityInfo, firstTimerTask.Timestamp) - } else { - err = t.mutableState.UpdateActivity(activityInfo) - } + err := t.mutableState.UpdateActivity(activityInfo.ScheduledEventId, func(ai *persistencespb.ActivityInfo, ms MutableState) { + // mark timer task mask as indication that timer task is generated + ai.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.TimerType) + if firstTimerTask.TimerType == enumspb.TIMEOUT_TYPE_HEARTBEAT { + t.mutableState.UpdateActivityTimerHeartbeat(ai.ScheduledEventId, firstTimerTask.Timestamp) + } + }) if err != nil { return false, err diff --git a/service/history/workflow/timer_sequence_test.go b/service/history/workflow/timer_sequence_test.go index 6f2a9f3e75d..42088c800a7 100644 --- a/service/history/workflow/timer_sequence_test.go +++ b/service/history/workflow/timer_sequence_test.go @@ -375,7 +375,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_BeforeWorkfl var activityInfoUpdated = common.CloneProto(activityInfo) // make a copy activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedScheduleToStart - s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfoUpdated)).Return(nil) + s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfo.ScheduledEventId), gomock.Any()).Return(nil) s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, @@ -417,7 +417,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_NoWorkflowEx var activityInfoUpdated = common.CloneProto(activityInfo) // make a copy activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedScheduleToStart - s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfoUpdated)).Return(nil) + s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfo.ScheduledEventId), gomock.Any()).Return(nil) s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, @@ -490,7 +490,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_BeforeWo var activityInfoUpdated = common.CloneProto(activityInfo) // make a copy activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedHeartbeat - s.mockMutableState.EXPECT().UpdateActivityWithTimerHeartbeat(protomock.Eq(activityInfoUpdated), taskVisibilityTimestamp).Return(nil) + s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfoUpdated.ScheduledEventId), gomock.Any()).Return(nil) s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey, @@ -534,7 +534,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_NoWorkfl var activityInfoUpdated = common.CloneProto(activityInfo) // make a copy activityInfoUpdated.TimerTaskStatus = TimerTaskStatusCreatedHeartbeat - s.mockMutableState.EXPECT().UpdateActivityWithTimerHeartbeat(protomock.Eq(activityInfoUpdated), taskVisibilityTimestamp).Return(nil) + s.mockMutableState.EXPECT().UpdateActivity(protomock.Eq(activityInfoUpdated.ScheduledEventId), gomock.Any()).Return(nil) s.mockMutableState.EXPECT().AddTasks(&tasks.ActivityTimeoutTask{ // TaskID is set by shard WorkflowKey: s.workflowKey,