Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: rename UpdateActivityWithCallback method on mutable state to UpdateActivity #6832

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/api/pauseactivity/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *pauseActivitySuite) TestPauseActivityAcceptance() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true)
s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any())
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any())

err := workflow.PauseActivityById(s.mockMutableState, activityId)
s.NoError(err)
Expand Down
36 changes: 19 additions & 17 deletions service/history/api/updateactivityoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -141,23 +142,24 @@ func updateActivityOptions(
return nil, err
}

// update activity info with new options
ai.TaskQueue = adjustedOptions.TaskQueue.Name
ai.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
ai.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
ai.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
ai.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
ai.Stamp++

// invalidate timers
ai.TimerTaskStatus = workflow.TimerTaskStatusNone
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// update activity info with new options
activityInfo.TaskQueue = adjustedOptions.TaskQueue.Name
activityInfo.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
activityInfo.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
activityInfo.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
activityInfo.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
activityInfo.Stamp++

// invalidate timers
activityInfo.TimerTaskStatus = workflow.TimerTaskStatusNone
}); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/api/updateactivityoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() {
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(fullActivityInfo, true)
s.mockMutableState.EXPECT().RegenerateActivityRetryTask(gomock.Any(), gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

request := &historyservice.UpdateActivityOptionsRequest{
UpdateRequest: &workflowservicepb.UpdateActivityOptionsByIdRequest{
Expand Down
10 changes: 6 additions & 4 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -550,10 +551,11 @@ func (r *workflowResetterImpl) failInflightActivity(
switch ai.StartedEventId {
case common.EmptyEventID:
// activity not started, noop
// override the scheduled activity time to now
ai.ScheduledTime = timestamppb.New(now)
ai.FirstScheduledTime = timestamppb.New(now)
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// override the scheduled activity time to now
activityInfo.ScheduledTime = timestamppb.New(now)
activityInfo.FirstScheduledTime = timestamppb.New(now)
}); err != nil {
return err
}

Expand Down
8 changes: 1 addition & 7 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,7 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
activity1.LastWorkerVersionStamp,
).Return(&historypb.HistoryEvent{}, nil)

mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
}).Return(nil)
mutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason)
s.NoError(err)
Expand Down
8 changes: 5 additions & 3 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return err
}
updateMutableState = true
Expand Down
9 changes: 6 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -243,10 +244,12 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return nil, err
}
updateMutableState = true
Expand Down
5 changes: 3 additions & 2 deletions service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func updateIndependentActivityBuildId(
return err
}

ai.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
err = mutableState.UpdateActivity(ai)
err = mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func PauseActivityById(mutableState MutableState, activityId string) error {
return nil
}

return mutableState.UpdateActivityWithCallback(ai.ActivityId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
return mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
// note - we are not increasing the stamp of the activity if it is running.
// this is because if activity is actually running we should let it finish
if GetActivityState(activityInfo) == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED {
Expand Down
7 changes: 3 additions & 4 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type (
MaxSearchAttributeValueSize int
}

UpdateActivityCallback func(*persistencespb.ActivityInfo, MutableState)
ActivityUpdater func(*persistencespb.ActivityInfo, MutableState)

MutableState interface {
callbacks.CanGetNexusCompletion
Expand Down Expand Up @@ -360,9 +360,8 @@ type (
baseRunLowestCommonAncestorEventID int64,
baseRunLowestCommonAncestorEventVersion int64,
)
UpdateActivity(*persistencespb.ActivityInfo) error
UpdateActivityWithTimerHeartbeat(*persistencespb.ActivityInfo, time.Time) error
UpdateActivityWithCallback(string, UpdateActivityCallback) error
UpdateActivity(int64, ActivityUpdater) error
UpdateActivityTimerHeartbeat(int64, time.Time)
UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
UpdateUserTimer(*persistencespb.TimerInfo) error
UpdateCurrentVersion(version int64, forceUpdate bool) error
Expand Down
72 changes: 28 additions & 44 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,37 +1659,9 @@ func (ms *MutableStateImpl) UpdateActivityInfo(
return err
}

// UpdateActivity updates an activity
func (ms *MutableStateImpl) UpdateActivity(
ai *persistencespb.ActivityInfo,
) error {
prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]
if !ok {
ms.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityId),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingActivityInfo
}

ms.pendingActivityInfoIDs[ai.ScheduledEventId] = ai
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.approximateSize += ai.Size() - prev.Size()
return nil
}

// UpdateActivityWithTimerHeartbeat updates an activity
func (ms *MutableStateImpl) UpdateActivityWithTimerHeartbeat(
ai *persistencespb.ActivityInfo,
timerTimeoutVisibility time.Time,
) error {
err := ms.UpdateActivity(ai)
if err != nil {
return err
}

ms.pendingActivityTimerHeartbeats[ai.ScheduledEventId] = timerTimeoutVisibility
return nil
func (ms *MutableStateImpl) UpdateActivityTimerHeartbeat(scheduledEventId int64, timerTimeoutVisibility time.Time) {
ms.pendingActivityTimerHeartbeats[scheduledEventId] = timerTimeoutVisibility
}

// DeleteActivity deletes details about an activity.
Expand Down Expand Up @@ -3045,14 +3017,16 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
return nil, err
}

// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
ai.Version = ms.GetCurrentVersion()
ai.StartedEventId = common.TransientEventID
ai.RequestId = requestID
ai.StartedTime = timestamppb.New(ms.timeSource.Now())
ai.StartedIdentity = identity
if err := ms.UpdateActivity(ai); err != nil {
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ MutableState) {
// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
activityInfo.Version = ms.GetCurrentVersion()
activityInfo.StartedEventId = common.TransientEventID
activityInfo.RequestId = requestID
activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now())
activityInfo.StartedIdentity = identity

}); err != nil {
return nil, err
}
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
Expand Down Expand Up @@ -4927,7 +4901,7 @@ func (ms *MutableStateImpl) RetryActivity(
}

func (ms *MutableStateImpl) RecordLastActivityCompleteTime(ai *persistencespb.ActivityInfo) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, _ MutableState) {
_ = ms.UpdateActivity(ai.ScheduledEventId, func(info *persistencespb.ActivityInfo, _ MutableState) {
ai.LastAttemptCompleteTime = timestamppb.New(ms.shard.GetTimeSource().Now().UTC())
})
}
Expand All @@ -4952,11 +4926,11 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
nextAttempt int32,
activityFailure *failurepb.Failure,
) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, mutableState MutableState) {
_ = ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, mutableState MutableState) {
mutableStateImpl, ok := mutableState.(*MutableStateImpl)
if ok {
ai = UpdateActivityInfoForRetries(
ai,
activityInfo,
mutableStateImpl.GetCurrentVersion(),
nextAttempt,
mutableStateImpl.truncateRetryableActivityFailure(activityFailure),
Expand All @@ -4966,8 +4940,18 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
})
}

func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, updateCallback UpdateActivityCallback) error {
ai, activityFound := ms.GetActivityByActivityID(activityId)
/*
UpdateActivity function updates the existing pending activity via the updater callback.
To update an activity, we need to do the following steps:
* preserve the original size of the activity
* preserve the original state of the activity
* call the updater callback to update the activity
* calculate new size of the activity
* respond to the changes of the activity state (like changes to pause)
*/

func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater ActivityUpdater) error {
ai, activityFound := ms.GetActivityInfo(scheduledEventId)
if !activityFound {
return consts.ErrActivityNotFound
}
Expand All @@ -4979,7 +4963,7 @@ func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, update
prevPause = prev.Paused
}

updateCallback(ai, ms)
updater(ai, ms)

if prevPause != ai.Paused {
err := ms.updatePauseInfoSearchAttribute()
Expand Down
36 changes: 10 additions & 26 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,13 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
}

if CompareVersionedTransition(minVersionedTransition, EmptyVersionedTransition) == 0 { // Full refresh
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone

// need to update activity timer task mask for which task is generated
if err := mutableState.UpdateActivity(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @xwduan is adding a dedicated method for updating timer task status since mutable state needs to know if the updated field supposed to be replicated.

activityInfo,
activityInfo.ScheduledEventId, func(ai *persistencespb.ActivityInfo, _ MutableState) {
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone
},
); err != nil {
return err
}
Expand Down
Loading
Loading