Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mutable state access after workflow lock is released #4333

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 91 additions & 59 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ package updateworkflow
import (
"context"
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

Expand All @@ -38,6 +40,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/internal/effect"
Expand All @@ -54,7 +57,7 @@ func Invoke(
shardCtx shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
matchingClient matchingservice.MatchingServiceClient,
) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) {
) (*historyservice.UpdateWorkflowExecutionResponse, error) {

var waitLifecycleStage func(ctx context.Context, u *update.Update) (*updatepb.Outcome, error)
waitStage := req.GetRequest().GetWaitPolicy().GetLifecycleStage()
Expand All @@ -78,66 +81,95 @@ func Invoke(
fmt.Sprintf("%v is not implemented", waitStage))
}

weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
req.NamespaceId,
req.Request.WorkflowExecution.WorkflowId,
req.Request.WorkflowExecution.RunId,
),
workflow.LockPriorityHigh,
wfKey := definition.NewWorkflowKey(
req.NamespaceId,
req.Request.WorkflowExecution.WorkflowId,
req.Request.WorkflowExecution.RunId,
)
if err != nil {
return nil, err
}
defer func() { weCtx.GetReleaseFn()(retErr) }()

ms := weCtx.GetMutableState()
if !ms.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
// Variables shared with wfCtxOperation. Using values instead of pointers to make sure
// they are copied and don't have any pointers to workflow context or mutable state.
var (
upd *update.Update
taskQueue taskqueuepb.TaskQueue
scheduledEventID int64
scheduleToStartTimeout time.Duration
)

if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() {
return nil, consts.ErrWorkflowExecutionNotFound
}
// Wrapping workflow context related operation in separate func to prevent usage of its fields
// (including any mutable state fields) outside of this func after workflow lock is released.
// It is important to release workflow lock before calling matching.
wfCtxOperation := func() (retErr error) {
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
wfKey,
workflow.LockPriorityHigh,
)
if err != nil {
return err
}
defer func() { weCtx.GetReleaseFn()(retErr) }()

updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId()
updateReg := weCtx.GetUpdateRegistry(ctx)
upd, alreadyExisted, err := updateReg.FindOrCreate(ctx, updateID)
if err != nil {
return nil, err
}
if err := upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); err != nil {
return nil, err
}
ms := weCtx.GetMutableState()
if !ms.IsWorkflowExecutionRunning() {
return consts.ErrWorkflowCompleted
}

// If WT is scheduled, but not started, updates will be attached to it, when WT is started.
// If WT has already started, new speculative WT will be created when started WT completes.
// If update is duplicate, then WT for this update was already created.
createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted
if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() {
return consts.ErrWorkflowExecutionNotFound
}

if createNewWorkflowTask {
// This will try not to add an event but will create speculative WT in mutable state.
wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
if err != nil {
return nil, err
updateID := req.GetRequest().GetRequest().GetMeta().GetUpdateId()
updateReg := weCtx.GetUpdateRegistry(ctx)
var alreadyExisted bool
if upd, alreadyExisted, err = updateReg.FindOrCreate(ctx, updateID); err != nil {
return err
}
if err = upd.OnMessage(ctx, req.GetRequest().GetRequest(), workflow.WithEffects(effect.Immediate(ctx), ms)); err != nil {
return err
}
if wt.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// This should never happen because WT is created as normal (despite speculative is requested)
// only if there were buffered events and because there were no pending WT, there can't be buffered events.
return nil, consts.ErrWorkflowTaskStateInconsistent

// If WT is scheduled, but not started, updates will be attached to it, when WT is started.
// If WT has already started, new speculative WT will be created when started WT completes.
// If update is duplicate, then WT for this update was already created.
createNewWorkflowTask := !ms.HasPendingWorkflowTask() && !alreadyExisted

if createNewWorkflowTask {
// This will try not to add an event but will create speculative WT in mutable state.
newWorkflowTask, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
if err != nil {
return err
}
if newWorkflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// This should never happen because WT is created as normal (despite speculative is requested)
// only if there were buffered events and because there were no pending WT, there can't be buffered events.
return consts.ErrWorkflowTaskStateInconsistent
}

scheduledEventID = newWorkflowTask.ScheduledEventID
if _, scheduleToStartTimeoutPtr := ms.TaskQueueScheduleToStartTimeout(ms.CurrentTaskQueue().Name); scheduleToStartTimeoutPtr != nil {
scheduleToStartTimeout = *scheduleToStartTimeoutPtr
}
taskQueue = taskqueuepb.TaskQueue{
Name: newWorkflowTask.TaskQueue.Name,
Kind: newWorkflowTask.TaskQueue.Kind,
}
}
return nil
}
err := wfCtxOperation()
if err != nil {
return nil, err
}

// It is important to release workflow lock before calling matching.
weCtx.GetReleaseFn()(nil)
err = addWorkflowTaskToMatching(ctx, shardCtx, ms, matchingClient, wt, namespace.ID(req.GetNamespaceId()))
// WT was created.
if scheduledEventID != common.EmptyEventID {
err = addWorkflowTaskToMatching(ctx, wfKey, &taskQueue, scheduledEventID, &scheduleToStartTimeout, namespace.ID(req.GetNamespaceId()), shardCtx, matchingClient)
if err != nil {
return nil, err
}
} else {
weCtx.GetReleaseFn()(nil)
}

updOutcome, err := waitLifecycleStage(ctx, upd)
Expand All @@ -148,8 +180,8 @@ func Invoke(
Response: &workflowservice.UpdateWorkflowExecutionResponse{
UpdateRef: &updatepb.UpdateRef{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: weCtx.GetWorkflowKey().WorkflowID,
RunId: weCtx.GetWorkflowKey().RunID,
WorkflowId: wfKey.WorkflowID,
RunId: wfKey.RunID,
},
UpdateId: req.GetRequest().GetRequest().GetMeta().GetUpdateId(),
},
Expand All @@ -163,14 +195,14 @@ func Invoke(
// TODO (alex-update): Consider moving this func to a better place.
func addWorkflowTaskToMatching(
ctx context.Context,
wfKey definition.WorkflowKey,
tq *taskqueuepb.TaskQueue,
scheduledEventID int64,
wtScheduleToStartTimeout *time.Duration,
nsID namespace.ID,
shardCtx shard.Context,
ms workflow.MutableState,
matchingClient matchingservice.MatchingServiceClient,
wt *workflow.WorkflowTaskInfo,
nsID namespace.ID,
) error {
_, scheduleToStartTimeout := ms.TaskQueueScheduleToStartTimeout(wt.TaskQueue.Name)
wfKey := ms.GetWorkflowKey()
clock, err := shardCtx.NewVectorClock()
if err != nil {
return err
Expand All @@ -182,9 +214,9 @@ func addWorkflowTaskToMatching(
WorkflowId: wfKey.WorkflowID,
RunId: wfKey.RunID,
},
TaskQueue: wt.TaskQueue,
ScheduledEventId: wt.ScheduledEventID,
ScheduleToStartTimeout: scheduleToStartTimeout,
TaskQueue: tq,
ScheduledEventId: scheduledEventID,
ScheduleToStartTimeout: wtScheduleToStartTimeout,
Clock: clock,
})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion service/history/workflow/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
// to clients yet (e.g. accepted or outcome futures have not been set yet).
// The observable changes are bound to the EventStore's effect.Controller
// and will be triggered when those effects are applied.
// State transitions (OnMessage calls) must be done while holding the workflow lock.
Update struct {
// accessed only while holding workflow lock
id string
Expand Down Expand Up @@ -151,7 +152,7 @@ func newCompleted(

// WaitOutcome observes this Update's completion, returning the Outcome when it
// is available. This call will block until the Outcome is known or the provided
// context.Context expires.
// context.Context expires. It is safe to call this method outside of workflow lock.
func (u *Update) WaitOutcome(ctx context.Context) (*updatepb.Outcome, error) {
return u.outcome.Get(ctx)
}
Expand All @@ -160,6 +161,7 @@ func (u *Update) WaitOutcome(ctx context.Context) (*updatepb.Outcome, error) {
// been accepted but not yet completed or the overall Outcome if the update has
// been completed (including completed by rejection). This call will block until
// the acceptance occurs or the provided context.Context expires.
// It is safe to call this method outside of workflow lock.
func (u *Update) WaitAccepted(ctx context.Context) (*updatepb.Outcome, error) {
if u.outcome.Ready() {
// being complete implies being accepted, return the completed outcome
Expand Down