From 75776b37de8966168242a5666bbf9cd21685b88e Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 11 May 2023 14:08:05 -0700 Subject: [PATCH 1/2] Fix mutable state access after workflow lock is released --- service/history/api/updateworkflow/api.go | 149 +++++++++++++--------- service/history/workflow/update/update.go | 4 +- 2 files changed, 93 insertions(+), 60 deletions(-) diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 97781b145ab..0ecdf3fb42b 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -27,9 +27,11 @@ package updateworkflow import ( "context" "fmt" + "time" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" @@ -38,6 +40,7 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/namespace" "go.temporal.io/server/internal/effect" @@ -54,7 +57,7 @@ func Invoke( shardCtx shard.Context, workflowConsistencyChecker api.WorkflowConsistencyChecker, matchingClient matchingservice.MatchingServiceClient, -) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) { +) (*historyservice.UpdateWorkflowExecutionResponse, error) { var waitLifecycleStage func(ctx context.Context, u *update.Update) (*updatepb.Outcome, error) waitStage := req.GetRequest().GetWaitPolicy().GetLifecycleStage() @@ -78,66 +81,94 @@ func Invoke( fmt.Sprintf("%v is not implemented", waitStage)) } - weCtx, err := workflowConsistencyChecker.GetWorkflowContext( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - req.NamespaceId, - req.Request.WorkflowExecution.WorkflowId, - req.Request.WorkflowExecution.RunId, - ), - workflow.LockPriorityHigh, + wfKey := definition.NewWorkflowKey( + req.NamespaceId, + req.Request.WorkflowExecution.WorkflowId, + req.Request.WorkflowExecution.RunId, ) - if err != nil { - return nil, err - } - defer func() { weCtx.GetReleaseFn()(retErr) }() - ms := weCtx.GetMutableState() - if !ms.IsWorkflowExecutionRunning() { - return nil, consts.ErrWorkflowCompleted - } + // Variables shared with wfCtxOperation. Using values instead of pointers to make sure + // they are copied and don't have any pointers to workflow context or mutable state. + var ( + upd *update.Update + taskQueue taskqueuepb.TaskQueue + scheduledEventID int64 + scheduleToStartTimeout time.Duration + ) - if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() { - return nil, consts.ErrWorkflowExecutionNotFound - } + // Wrapping workflow context related operation in separate func to prevent usage of its fields + // (including any mutable state fields) outside of this func after workflow lock is released. + // It is important to release workflow lock before calling matching. + wfCtxOperation := func() (retErr error) { + weCtx, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + wfKey, + workflow.LockPriorityHigh, + ) + if err != nil { + return err + } + defer func() { weCtx.GetReleaseFn()(retErr) }() - updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() - updateReg := weCtx.GetUpdateRegistry(ctx) - upd, alreadyExisted, err := updateReg.FindOrCreate(ctx, updateID) - if err != nil { - return nil, err - } - if err := upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); err != nil { - return nil, err - } + ms := weCtx.GetMutableState() + if !ms.IsWorkflowExecutionRunning() { + return consts.ErrWorkflowCompleted + } - // If WT is scheduled, but not started, updates will be attached to it, when WT is started. - // If WT has already started, new speculative WT will be created when started WT completes. - // If update is duplicate, then WT for this update was already created. - createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted + if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() { + return consts.ErrWorkflowExecutionNotFound + } - if createNewWorkflowTask { - // This will try not to add an event but will create speculative WT in mutable state. - wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE) - if err != nil { - return nil, err + updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId() + updateReg := weCtx.GetUpdateRegistry(ctx) + var alreadyExisted bool + if upd, alreadyExisted, err = updateReg.FindOrCreate(ctx, updateID); err != nil { + return err + } + if err = upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); err != nil { + return err } - if wt.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { - // This should never happen because WT is created as normal (despite speculative is requested) - // only if there were buffered events and because there were no pending WT, there can't be buffered events. - return nil, consts.ErrWorkflowTaskStateInconsistent + + // If WT is scheduled, but not started, updates will be attached to it, when WT is started. + // If WT has already started, new speculative WT will be created when started WT completes. + // If update is duplicate, then WT for this update was already created. + createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted + + if createNewWorkflowTask { + // This will try not to add an event but will create speculative WT in mutable state. + newWorkflowTask, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE) + if err != nil { + return err + } + if newWorkflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + // This should never happen because WT is created as normal (despite speculative is requested) + // only if there were buffered events and because there were no pending WT, there can't be buffered events. + return consts.ErrWorkflowTaskStateInconsistent + } + + scheduledEventID = newWorkflowTask.ScheduledEventID + _, scheduleToStartTimeoutPtr := ms.TaskQueueScheduleToStartTimeout(ms.CurrentTaskQueue().Name) + scheduleToStartTimeout = *scheduleToStartTimeoutPtr + taskQueue = taskqueuepb.TaskQueue{ + Name: newWorkflowTask.TaskQueue.Name, + Kind: newWorkflowTask.TaskQueue.Kind, + } } + return nil + } + err := wfCtxOperation() + if err != nil { + return nil, err + } - // It is important to release workflow lock before calling matching. - weCtx.GetReleaseFn()(nil) - err = addWorkflowTaskToMatching(ctx, shardCtx, ms, matchingClient, wt, namespace.ID(req.GetNamespaceId())) + // WT was created. + if scheduledEventID != common.EmptyEventID { + err = addWorkflowTaskToMatching(ctx, wfKey, &taskQueue, scheduledEventID, &scheduleToStartTimeout, namespace.ID(req.GetNamespaceId()), shardCtx, matchingClient) if err != nil { return nil, err } - } else { - weCtx.GetReleaseFn()(nil) } updOutcome, err := waitLifecycleStage(ctx, upd) @@ -148,8 +179,8 @@ func Invoke( Response: &workflowservice.UpdateWorkflowExecutionResponse{ UpdateRef: &updatepb.UpdateRef{ WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: weCtx.GetWorkflowKey().WorkflowID, - RunId: weCtx.GetWorkflowKey().RunID, + WorkflowId: wfKey.WorkflowID, + RunId: wfKey.RunID, }, UpdateId: req.GetRequest().GetRequest().GetMeta().GetUpdateId(), }, @@ -163,14 +194,14 @@ func Invoke( // TODO (alex-update): Consider moving this func to a better place. func addWorkflowTaskToMatching( ctx context.Context, + wfKey definition.WorkflowKey, + tq *taskqueuepb.TaskQueue, + scheduledEventID int64, + wtScheduleToStartTimeout *time.Duration, + nsID namespace.ID, shardCtx shard.Context, - ms workflow.MutableState, matchingClient matchingservice.MatchingServiceClient, - wt *workflow.WorkflowTaskInfo, - nsID namespace.ID, ) error { - _, scheduleToStartTimeout := ms.TaskQueueScheduleToStartTimeout(wt.TaskQueue.Name) - wfKey := ms.GetWorkflowKey() clock, err := shardCtx.NewVectorClock() if err != nil { return err @@ -182,9 +213,9 @@ func addWorkflowTaskToMatching( WorkflowId: wfKey.WorkflowID, RunId: wfKey.RunID, }, - TaskQueue: wt.TaskQueue, - ScheduledEventId: wt.ScheduledEventID, - ScheduleToStartTimeout: scheduleToStartTimeout, + TaskQueue: tq, + ScheduledEventId: scheduledEventID, + ScheduleToStartTimeout: wtScheduleToStartTimeout, Clock: clock, }) if err != nil { diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index 2f9eaa40636..b0622c98770 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -70,6 +70,7 @@ type ( // to clients yet (e.g. accepted or outcome futures have not been set yet). // The observable changes are bound to the EventStore's effect.Controller // and will be triggered when those effects are applied. + // State transitions (OnMessage calls) must be done while holding the workflow lock. Update struct { // accessed only while holding workflow lock id string @@ -151,7 +152,7 @@ func newCompleted( // WaitOutcome observes this Update's completion, returning the Outcome when it // is available. This call will block until the Outcome is known or the provided -// context.Context expires. +// context.Context expires. It is safe to call this method outside of workflow lock. func (u *Update) WaitOutcome(ctx context.Context) (*updatepb.Outcome, error) { return u.outcome.Get(ctx) } @@ -160,6 +161,7 @@ func (u *Update) WaitOutcome(ctx context.Context) (*updatepb.Outcome, error) { // been accepted but not yet completed or the overall Outcome if the update has // been completed (including completed by rejection). This call will block until // the acceptance occurs or the provided context.Context expires. +// It is safe to call this method outside of workflow lock. func (u *Update) WaitAccepted(ctx context.Context) (*updatepb.Outcome, error) { if u.outcome.Ready() { // being complete implies being accepted, return the completed outcome From f7e7706b973449425a0a11310103581e3d0a5ec5 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 15 May 2023 16:06:33 -0700 Subject: [PATCH 2/2] Fix NPE --- service/history/api/updateworkflow/api.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 0ecdf3fb42b..b79326e84ce 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -149,8 +149,9 @@ func Invoke( } scheduledEventID = newWorkflowTask.ScheduledEventID - _, scheduleToStartTimeoutPtr := ms.TaskQueueScheduleToStartTimeout(ms.CurrentTaskQueue().Name) - scheduleToStartTimeout = *scheduleToStartTimeoutPtr + if _, scheduleToStartTimeoutPtr := ms.TaskQueueScheduleToStartTimeout(ms.CurrentTaskQueue().Name); scheduleToStartTimeoutPtr != nil { + scheduleToStartTimeout = *scheduleToStartTimeoutPtr + } taskQueue = taskqueuepb.TaskQueue{ Name: newWorkflowTask.TaskQueue.Name, Kind: newWorkflowTask.TaskQueue.Kind,