Skip to content

Commit

Permalink
Refactoring: rename UpdateActivityWithCallback method on mutable stat…
Browse files Browse the repository at this point in the history
…e to UpdateActivity (#6832)

## What changed?
<!-- Describe what has changed in this PR -->
1. I change every UpdateActivity to UpdateActivityWithCallback
everywhere in the code.
2. I removed old UpdateActivity
3. I rename UpdateActivityWithCallback to UpdateActivity
4. Fix tests, etc.

## Why?
<!-- Tell your future self why have you made these changes -->
Old UpdateActivity was not actually working. ActivityInfo in pending
activities is a pointer, so calculating size was wrong. Few other
reasons.

## How did you test it?
unit tests

## Risks
With this change the size of mutable state is calculated properly. This
means that in some cases it may change, thus becoming larger then
before, and trigger some errors.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No

---------

Co-authored-by: Alex Shtin <[email protected]>
  • Loading branch information
ychebotarev and alexshtin authored Nov 18, 2024
1 parent 406b681 commit 3a1b707
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 128 deletions.
2 changes: 1 addition & 1 deletion service/history/api/pauseactivity/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *pauseActivitySuite) TestPauseActivityAcceptance() {

s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(activityInfo, true)
s.mockMutableState.EXPECT().UpdateActivityWithCallback(gomock.Any(), gomock.Any())
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any())

err := workflow.PauseActivityById(s.mockMutableState, activityId)
s.NoError(err)
Expand Down
36 changes: 19 additions & 17 deletions service/history/api/updateactivityoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -141,23 +142,24 @@ func updateActivityOptions(
return nil, err
}

// update activity info with new options
ai.TaskQueue = adjustedOptions.TaskQueue.Name
ai.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
ai.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
ai.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
ai.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
ai.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
ai.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
ai.Stamp++

// invalidate timers
ai.TimerTaskStatus = workflow.TimerTaskStatusNone
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// update activity info with new options
activityInfo.TaskQueue = adjustedOptions.TaskQueue.Name
activityInfo.ScheduleToCloseTimeout = adjustedOptions.ScheduleToCloseTimeout
activityInfo.ScheduleToStartTimeout = adjustedOptions.ScheduleToStartTimeout
activityInfo.StartToCloseTimeout = adjustedOptions.StartToCloseTimeout
activityInfo.HeartbeatTimeout = adjustedOptions.HeartbeatTimeout
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryBackoffCoefficient = adjustedOptions.RetryPolicy.BackoffCoefficient
activityInfo.RetryMaximumInterval = adjustedOptions.RetryPolicy.MaximumInterval
activityInfo.RetryMaximumAttempts = adjustedOptions.RetryPolicy.MaximumAttempts

// move forward activity version
activityInfo.Stamp++

// invalidate timers
activityInfo.TimerTaskStatus = workflow.TimerTaskStatusNone
}); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/api/updateactivityoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (s *activityOptionsSuite) Test_updateActivityOptionsAcceptance() {
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
s.mockMutableState.EXPECT().GetActivityByActivityID(gomock.Any()).Return(fullActivityInfo, true)
s.mockMutableState.EXPECT().RegenerateActivityRetryTask(gomock.Any(), gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any()).Return(nil)
s.mockMutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

request := &historyservice.UpdateActivityOptionsRequest{
UpdateRequest: &workflowservicepb.UpdateActivityOptionsByIdRequest{
Expand Down
10 changes: 6 additions & 4 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -550,10 +551,11 @@ func (r *workflowResetterImpl) failInflightActivity(
switch ai.StartedEventId {
case common.EmptyEventID:
// activity not started, noop
// override the scheduled activity time to now
ai.ScheduledTime = timestamppb.New(now)
ai.FirstScheduledTime = timestamppb.New(now)
if err := mutableState.UpdateActivity(ai); err != nil {
if err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
// override the scheduled activity time to now
activityInfo.ScheduledTime = timestamppb.New(now)
activityInfo.FirstScheduledTime = timestamppb.New(now)
}); err != nil {
return err
}

Expand Down
8 changes: 1 addition & 7 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,7 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
activity1.LastWorkerVersionStamp,
).Return(&historypb.HistoryEvent{}, nil)

mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
}).Return(nil)
mutableState.EXPECT().UpdateActivity(gomock.Any(), gomock.Any()).Return(nil)

err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason)
s.NoError(err)
Expand Down
8 changes: 5 additions & 3 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return err
}
updateMutableState = true
Expand Down
9 changes: 6 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -243,10 +244,12 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
activityInfo, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
err := mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ workflow.TimerTaskStatusCreatedHeartbeat
})
if err != nil {
return nil, err
}
updateMutableState = true
Expand Down
5 changes: 3 additions & 2 deletions service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func updateIndependentActivityBuildId(
return err
}

ai.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
err = mutableState.UpdateActivity(ai)
err = mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ workflow.MutableState) {
activityInfo.BuildIdInfo = &persistencespb.ActivityInfo_LastIndependentlyAssignedBuildId{LastIndependentlyAssignedBuildId: buildId}
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func PauseActivityById(mutableState MutableState, activityId string) error {
return nil
}

return mutableState.UpdateActivityWithCallback(ai.ActivityId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
return mutableState.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistence.ActivityInfo, _ MutableState) {
// note - we are not increasing the stamp of the activity if it is running.
// this is because if activity is actually running we should let it finish
if GetActivityState(activityInfo) == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED {
Expand Down
7 changes: 3 additions & 4 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type (
MaxSearchAttributeValueSize int
}

UpdateActivityCallback func(*persistencespb.ActivityInfo, MutableState)
ActivityUpdater func(*persistencespb.ActivityInfo, MutableState)

MutableState interface {
callbacks.CanGetNexusCompletion
Expand Down Expand Up @@ -360,9 +360,8 @@ type (
baseRunLowestCommonAncestorEventID int64,
baseRunLowestCommonAncestorEventVersion int64,
)
UpdateActivity(*persistencespb.ActivityInfo) error
UpdateActivityWithTimerHeartbeat(*persistencespb.ActivityInfo, time.Time) error
UpdateActivityWithCallback(string, UpdateActivityCallback) error
UpdateActivity(int64, ActivityUpdater) error
UpdateActivityTimerHeartbeat(int64, time.Time)
UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
UpdateUserTimer(*persistencespb.TimerInfo) error
UpdateCurrentVersion(version int64, forceUpdate bool) error
Expand Down
72 changes: 28 additions & 44 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,37 +1659,9 @@ func (ms *MutableStateImpl) UpdateActivityInfo(
return err
}

// UpdateActivity updates an activity
func (ms *MutableStateImpl) UpdateActivity(
ai *persistencespb.ActivityInfo,
) error {
prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]
if !ok {
ms.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityId),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingActivityInfo
}

ms.pendingActivityInfoIDs[ai.ScheduledEventId] = ai
ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.approximateSize += ai.Size() - prev.Size()
return nil
}

// UpdateActivityWithTimerHeartbeat updates an activity
func (ms *MutableStateImpl) UpdateActivityWithTimerHeartbeat(
ai *persistencespb.ActivityInfo,
timerTimeoutVisibility time.Time,
) error {
err := ms.UpdateActivity(ai)
if err != nil {
return err
}

ms.pendingActivityTimerHeartbeats[ai.ScheduledEventId] = timerTimeoutVisibility
return nil
func (ms *MutableStateImpl) UpdateActivityTimerHeartbeat(scheduledEventId int64, timerTimeoutVisibility time.Time) {
ms.pendingActivityTimerHeartbeats[scheduledEventId] = timerTimeoutVisibility
}

// DeleteActivity deletes details about an activity.
Expand Down Expand Up @@ -3045,14 +3017,16 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
return nil, err
}

// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
ai.Version = ms.GetCurrentVersion()
ai.StartedEventId = common.TransientEventID
ai.RequestId = requestID
ai.StartedTime = timestamppb.New(ms.timeSource.Now())
ai.StartedIdentity = identity
if err := ms.UpdateActivity(ai); err != nil {
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ MutableState) {
// we might need to retry, so do not append started event just yet,
// instead update mutable state and will record started event when activity task is closed
activityInfo.Version = ms.GetCurrentVersion()
activityInfo.StartedEventId = common.TransientEventID
activityInfo.RequestId = requestID
activityInfo.StartedTime = timestamppb.New(ms.timeSource.Now())
activityInfo.StartedIdentity = identity

}); err != nil {
return nil, err
}
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
Expand Down Expand Up @@ -4927,7 +4901,7 @@ func (ms *MutableStateImpl) RetryActivity(
}

func (ms *MutableStateImpl) RecordLastActivityCompleteTime(ai *persistencespb.ActivityInfo) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, _ MutableState) {
_ = ms.UpdateActivity(ai.ScheduledEventId, func(info *persistencespb.ActivityInfo, _ MutableState) {
ai.LastAttemptCompleteTime = timestamppb.New(ms.shard.GetTimeSource().Now().UTC())
})
}
Expand All @@ -4952,11 +4926,11 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
nextAttempt int32,
activityFailure *failurepb.Failure,
) {
_ = ms.UpdateActivityWithCallback(ai.ActivityId, func(info *persistencespb.ActivityInfo, mutableState MutableState) {
_ = ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, mutableState MutableState) {
mutableStateImpl, ok := mutableState.(*MutableStateImpl)
if ok {
ai = UpdateActivityInfoForRetries(
ai,
activityInfo,
mutableStateImpl.GetCurrentVersion(),
nextAttempt,
mutableStateImpl.truncateRetryableActivityFailure(activityFailure),
Expand All @@ -4966,8 +4940,18 @@ func (ms *MutableStateImpl) updateActivityInfoForRetries(
})
}

func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, updateCallback UpdateActivityCallback) error {
ai, activityFound := ms.GetActivityByActivityID(activityId)
/*
UpdateActivity function updates the existing pending activity via the updater callback.
To update an activity, we need to do the following steps:
* preserve the original size of the activity
* preserve the original state of the activity
* call the updater callback to update the activity
* calculate new size of the activity
* respond to the changes of the activity state (like changes to pause)
*/

func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater ActivityUpdater) error {
ai, activityFound := ms.GetActivityInfo(scheduledEventId)
if !activityFound {
return consts.ErrActivityNotFound
}
Expand All @@ -4979,7 +4963,7 @@ func (ms *MutableStateImpl) UpdateActivityWithCallback(activityId string, update
prevPause = prev.Paused
}

updateCallback(ai, ms)
updater(ai, ms)

if prevPause != ai.Paused {
err := ms.updatePauseInfoSearchAttribute()
Expand Down
36 changes: 10 additions & 26 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,13 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
}

if CompareVersionedTransition(minVersionedTransition, EmptyVersionedTransition) == 0 { // Full refresh
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone

// need to update activity timer task mask for which task is generated
if err := mutableState.UpdateActivity(
activityInfo,
activityInfo.ScheduledEventId, func(ai *persistencespb.ActivityInfo, _ MutableState) {
// clear activity timer task mask for later activity timer task re-generation
activityInfo.TimerTaskStatus = TimerTaskStatusNone
},
); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3a1b707

Please sign in to comment.