From 4ec6fb7e0fa5231ec24305ef8753b550f3c6d5f4 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 11 May 2023 14:08:05 -0700 Subject: [PATCH] Fix mutable state access after workflow lock is released --- service/history/api/updateworkflow/api.go | 134 +++++++++++++--------- 1 file changed, 77 insertions(+), 57 deletions(-) diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 97781b145ab8..fb0cf4e8e912 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -27,6 +27,7 @@ package updateworkflow import ( "context" "fmt" + "time" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" @@ -78,66 +79,86 @@ func Invoke( fmt.Sprintf("%v is not implemented", waitStage)) } - weCtx, err := workflowConsistencyChecker.GetWorkflowContext( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - req.NamespaceId, - req.Request.WorkflowExecution.WorkflowId, - req.Request.WorkflowExecution.RunId, - ), - workflow.LockPriorityHigh, + wfKey := definition.NewWorkflowKey( + req.NamespaceId, + req.Request.WorkflowExecution.WorkflowId, + req.Request.WorkflowExecution.RunId, ) - if err != nil { - return nil, err - } - defer func() { weCtx.GetReleaseFn()(retErr) }() - ms := weCtx.GetMutableState() - if !ms.IsWorkflowExecutionRunning() { - return nil, consts.ErrWorkflowCompleted - } - - if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() { - return nil, consts.ErrWorkflowExecutionNotFound - } + // Wrapping workflow context related operation in separate func to prevent usage of its fields (including any mutable state fields) + // outside of this func after workflow lock is released. + // It is important to release workflow lock before calling matching. + wfCtxOperation := func(wfKey definition.WorkflowKey) ( + upd *update.Update, + newWorkflowTask *workflow.WorkflowTaskInfo, + scheduleToStartTimeout *time.Duration, + retErr error) { + + weCtx, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + wfKey, + workflow.LockPriorityHigh, + ) + if err != nil { + retErr = err + return + } + defer func() { weCtx.GetReleaseFn()(retErr) }() - updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() - updateReg := weCtx.GetUpdateRegistry(ctx) - upd, alreadyExisted, err := updateReg.FindOrCreate(ctx, updateID) - if err != nil { - return nil, err - } - if err := upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); err != nil { - return nil, err - } + ms := weCtx.GetMutableState() + if !ms.IsWorkflowExecutionRunning() { + retErr = consts.ErrWorkflowCompleted + return + } - // If WT is scheduled, but not started, updates will be attached to it, when WT is started. - // If WT has already started, new speculative WT will be created when started WT completes. - // If update is duplicate, then WT for this update was already created. - createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted + if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() { + retErr = consts.ErrWorkflowExecutionNotFound + return + } - if createNewWorkflowTask { - // This will try not to add an event but will create speculative WT in mutable state. - wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE) - if err != nil { - return nil, err + updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() + updateReg := weCtx.GetUpdateRegistry(ctx) + var alreadyExisted bool + if upd, alreadyExisted, retErr = updateReg.FindOrCreate(ctx, updateID); retErr != nil { + return } - if wt.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { - // This should never happen because WT is created as normal (despite speculative is requested) - // only if there were buffered events and because there were no pending WT, there can't be buffered events. - return nil, consts.ErrWorkflowTaskStateInconsistent + if retErr = upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); retErr != nil { + return } - // It is important to release workflow lock before calling matching. - weCtx.GetReleaseFn()(nil) - err = addWorkflowTaskToMatching(ctx, shardCtx, ms, matchingClient, wt, namespace.ID(req.GetNamespaceId())) + // If WT is scheduled, but not started, updates will be attached to it, when WT is started. + // If WT has already started, new speculative WT will be created when started WT completes. + // If update is duplicate, then WT for this update was already created. + createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted + + if createNewWorkflowTask { + // This will try not to add an event but will create speculative WT in mutable state. + if newWorkflowTask, retErr = ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE); retErr != nil { + return + } + if newWorkflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + // This should never happen because WT is created as normal (despite speculative is requested) + // only if there were buffered events and because there were no pending WT, there can't be buffered events. + retErr = consts.ErrWorkflowTaskStateInconsistent + return + } + + _, scheduleToStartTimeout = ms.TaskQueueScheduleToStartTimeout(ms.CurrentTaskQueue().Name) + } + return + } + upd, newWorkflowTask, scheduleToStartTimeout, err := wfCtxOperation(wfKey) + if err != nil { + return nil, err + } + + if newWorkflowTask != nil { + err = addWorkflowTaskToMatching(ctx, wfKey, newWorkflowTask, scheduleToStartTimeout, namespace.ID(req.GetNamespaceId()), shardCtx, matchingClient) if err != nil { return nil, err } - } else { - weCtx.GetReleaseFn()(nil) } updOutcome, err := waitLifecycleStage(ctx, upd) @@ -148,8 +169,8 @@ func Invoke( Response: &workflowservice.UpdateWorkflowExecutionResponse{ UpdateRef: &updatepb.UpdateRef{ WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: weCtx.GetWorkflowKey().WorkflowID, - RunId: weCtx.GetWorkflowKey().RunID, + WorkflowId: wfKey.WorkflowID, + RunId: wfKey.RunID, }, UpdateId: req.GetRequest().GetRequest().GetMeta().GetUpdateId(), }, @@ -163,14 +184,13 @@ func Invoke( // TODO (alex-update): Consider moving this func to a better place. func addWorkflowTaskToMatching( ctx context.Context, - shardCtx shard.Context, - ms workflow.MutableState, - matchingClient matchingservice.MatchingServiceClient, + wfKey definition.WorkflowKey, wt *workflow.WorkflowTaskInfo, + wtScheduleToStartTimeout *time.Duration, nsID namespace.ID, + shardCtx shard.Context, + matchingClient matchingservice.MatchingServiceClient, ) error { - _, scheduleToStartTimeout := ms.TaskQueueScheduleToStartTimeout(wt.TaskQueue.Name) - wfKey := ms.GetWorkflowKey() clock, err := shardCtx.NewVectorClock() if err != nil { return err @@ -184,7 +204,7 @@ func addWorkflowTaskToMatching( }, TaskQueue: wt.TaskQueue, ScheduledEventId: wt.ScheduledEventID, - ScheduleToStartTimeout: scheduleToStartTimeout, + ScheduleToStartTimeout: wtScheduleToStartTimeout, Clock: clock, }) if err != nil {