Skip to content

Commit

Permalink
Refactoring - UpdateActivity
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Nov 16, 2024
1 parent 6b0e0fb commit 67c2527
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 124 deletions.
2 changes: 1 addition & 1 deletion service/history/api/pauseactivity/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *pauseActivitySuite) TestPauseActivityAcceptance() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true)
s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any())
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any())

err := workflow.PauseActivityById(s.mockMutableState, activityId)
s.NoError(err)
Expand Down
36 changes: 19 additions & 17 deletions service/history/api/updateactivityoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -141,23 +142,24 @@ func updateActivityOptions(
return nil, err
}

// update activity info with new options
ai.TaskQueue = adjustedOptions.TaskQueue.Name
ai.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
ai.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
ai.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
ai.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
ai.Stamp++

// invalidate timers
ai.TimerTaskStatus = workflow.TimerTaskStatusNone
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// update activity info with new options
activityInfo.TaskQueue = adjustedOptions.TaskQueue.Name
activityInfo.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
activityInfo.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
activityInfo.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
activityInfo.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
activityInfo.Stamp++

// invalidate timers
activityInfo.TimerTaskStatus = workflow.TimerTaskStatusNone
}); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/api/updateactivityoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() {
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(fullActivityInfo, true)
s.mockMutableState.EXPECT().RegenerateActivityRetryTask(gomock.Any(), gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

request := &historyservice.UpdateActivityOptionsRequest{
UpdateRequest: &workflowservicepb.UpdateActivityOptionsByIdRequest{
Expand Down
10 changes: 6 additions & 4 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -550,10 +551,11 @@ func (r *workflowResetterImpl) failInflightActivity(
switch ai.StartedEventId {
case common.EmptyEventID:
// activity not started, noop
// override the scheduled activity time to now
ai.ScheduledTime = timestamppb.New(now)
ai.FirstScheduledTime = timestamppb.New(now)
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// override the scheduled activity time to now
activityInfo.ScheduledTime = timestamppb.New(now)
activityInfo.FirstScheduledTime = timestamppb.New(now)
}); err != nil {
return err
}

Expand Down
8 changes: 1 addition & 7 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,7 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
activity1.LastWorkerVersionStamp,
).Return(&historypb.HistoryEvent{}, nil)

mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
}).Return(nil)
mutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason)
s.NoError(err)
Expand Down
8 changes: 5 additions & 3 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return err
}
updateMutableState = true
Expand Down
9 changes: 6 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -243,10 +244,12 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return nil, err
}
updateMutableState = true
Expand Down
5 changes: 3 additions & 2 deletions service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func updateIndependentActivityBuildId(
return err
}

ai.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
err = mutableState.UpdateActivity(ai)
err = mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func PauseActivityById(mutableState MutableState, activityId string) error {
return nil
}

return mutableState.UpdateActivityWithCallback(ai.ActivityId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
return mutableState.UpdateActivity(ai.ActivityId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
// note - we are not increasing the stamp of the activity if it is running.
// this is because if activity is actually running we should let it finish
if GetActivityState(activityInfo) == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED {
Expand Down
5 changes: 2 additions & 3 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ type (
baseRunLowestCommonAncestorEventID int64,
baseRunLowestCommonAncestorEventVersion int64,
)
UpdateActivity(*persistencespb.ActivityInfo) error
UpdateActivityWithTimerHeartbeat(*persistencespb.ActivityInfo, time.Time) error
UpdateActivityWithCallback(string, UpdateActivityCallback) error
UpdateActivity(string, UpdateActivityCallback) error
UpdateActivityTimerHeartbeat(int64, time.Time)
UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
UpdateUserTimer(*persistencespb.TimerInfo) error
UpdateCurrentVersion(version int64, forceUpdate bool) error
Expand Down
56 changes: 15 additions & 41 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,37 +1659,9 @@ func (ms *MutableStateImpl) UpdateActivityInfo(
return err
}

// UpdateActivity updates an activity
func (ms *MutableStateImpl) UpdateActivity(
ai *persistencespb.ActivityInfo,
) error {
prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]
if !ok {
ms.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityId),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingActivityInfo
}

ms.pendingActivityInfoIDs[ai.ScheduledEventId] = ai
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.approximateSize += ai.Size() - prev.Size()
return nil
}

// UpdateActivityWithTimerHeartbeat updates an activity
func (ms *MutableStateImpl) UpdateActivityWithTimerHeartbeat(
ai *persistencespb.ActivityInfo,
timerTimeoutVisibility time.Time,
) error {
err := ms.UpdateActivity(ai)
if err != nil {
return err
}

ms.pendingActivityTimerHeartbeats[ai.ScheduledEventId] = timerTimeoutVisibility
return nil
func (ms *MutableStateImpl) UpdateActivityTimerHeartbeat(scheduledEventId int64, timerTimeoutVisibility time.Time) {
ms.pendingActivityTimerHeartbeats[scheduledEventId] = timerTimeoutVisibility
}

// DeleteActivity deletes details about an activity.
Expand Down Expand Up @@ -3045,14 +3017,16 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
return nil, err
}

// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
ai.Version = ms.GetCurrentVersion()
ai.StartedEventId = common.TransientEventID
ai.RequestId = requestID
ai.StartedTime = timestamppb.New(ms.timeSource.Now())
ai.StartedIdentity = identity
if err := ms.UpdateActivity(ai); err != nil {
if err := ms.UpdateActivity(ai.ActivityId, func(activityInfo *persistencespb.ActivityInfo, _ MutableState) {
// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
activityInfo.Version = ms.GetCurrentVersion()
activityInfo.StartedEventId = common.TransientEventID
activityInfo.RequestId = requestID
activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now())
activityInfo.StartedIdentity = identity

}); err != nil {
return nil, err
}
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
Expand Down Expand Up @@ -4927,7 +4901,7 @@ func (ms *MutableStateImpl) RetryActivity(
}

func (ms *MutableStateImpl) RecordLastActivityCompleteTime(ai *persistencespb.ActivityInfo) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, _ MutableState) {
_ = ms.UpdateActivity(ai.ActivityId, func(info *persistencespb.ActivityInfo, _ MutableState) {
ai.LastAttemptCompleteTime = timestamppb.New(ms.shard.GetTimeSource().Now().UTC())
})
}
Expand All @@ -4952,7 +4926,7 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
nextAttempt int32,
activityFailure *failurepb.Failure,
) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, mutableState MutableState) {
_ = ms.UpdateActivity(ai.ActivityId, func(info *persistencespb.ActivityInfo, mutableState MutableState) {
mutableStateImpl, ok := mutableState.(*MutableStateImpl)
if ok {
ai = UpdateActivityInfoForRetries(
Expand All @@ -4966,7 +4940,7 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
})
}

func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, updateCallback UpdateActivityCallback) error {
func (ms *MutableStateImpl) UpdateActivity(activityId string, updateCallback UpdateActivityCallback) error {
ai, activityFound := ms.GetActivityByActivityID(activityId)
if !activityFound {
return consts.ErrActivityNotFound
Expand Down
36 changes: 10 additions & 26 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,13 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
}

if CompareVersionedTransition(minVersionedTransition, EmptyVersionedTransition) == 0 { // Full refresh
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone

// need to update activity timer task mask for which task is generated
if err := mutableState.UpdateActivity(
activityInfo,
activityInfo.ActivityId, func(ai *persistencespb.ActivityInfo, _ MutableState) {
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone
},
); err != nil {
return err
}
Expand Down
15 changes: 7 additions & 8 deletions service/history/workflow/timer_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,14 @@ func (t *timerSequenceImpl) CreateNextActivityTimer() (bool, error) {
if !ok {
return false, serviceerror.NewInternal(fmt.Sprintf("unable to load activity info %v", firstTimerTask.EventID))
}
// mark timer task mask as indication that timer task is generated
activityInfo.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.TimerType)

var err error
if firstTimerTask.TimerType == enumspb.TIMEOUT_TYPE_HEARTBEAT {
err = t.mutableState.UpdateActivityWithTimerHeartbeat(activityInfo, firstTimerTask.Timestamp)
} else {
err = t.mutableState.UpdateActivity(activityInfo)
}
err := t.mutableState.UpdateActivity(activityInfo.ActivityId, func(ai *persistencespb.ActivityInfo, ms MutableState) {
// mark timer task mask as indication that timer task is generated
ai.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.TimerType)
if firstTimerTask.TimerType == enumspb.TIMEOUT_TYPE_HEARTBEAT {
t.mutableState.UpdateActivityTimerHeartbeat(ai.ScheduledEventId, firstTimerTask.Timestamp)
}
})

if err != nil {
return false, err
Expand Down
Loading

0 comments on commit 67c2527

Please sign in to comment.