diff --git a/service/history/api/consistency_checker.go b/service/history/api/consistency_checker.go index 18080e5c2183..5fa3f6005727 100644 --- a/service/history/api/consistency_checker.go +++ b/service/history/api/consistency_checker.go @@ -59,6 +59,7 @@ type ( reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) } @@ -108,6 +109,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { if reqClock != nil { currentClock := c.shardContext.CurrentVectorClock() @@ -117,6 +119,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( reqClock, currentClock, workflowKey, + lockPriority, ) } // request vector clock cannot is not comparable with current shard vector clock @@ -133,6 +136,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( &shardOwnershipAsserted, consistencyPredicate, workflowKey, + lockPriority, ) } return c.getCurrentWorkflowContext( @@ -141,6 +145,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( consistencyPredicate, workflowKey.NamespaceID, workflowKey.WorkflowID, + lockPriority, ) } @@ -149,6 +154,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( reqClock *clockspb.VectorClock, currentClock *clockspb.VectorClock, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { cmpResult, err := vclock.Compare(reqClock, currentClock) if err != nil { @@ -170,7 +176,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeAPI, + lockPriority, ) if err != nil { return nil, err @@ -189,6 +195,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck( shardOwnershipAsserted *bool, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { if len(workflowKey.RunID) == 0 { return nil, serviceerror.NewInternal(fmt.Sprintf( @@ -203,7 +210,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeAPI, + lockPriority, ) if err != nil { return nil, err @@ -245,6 +252,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( consistencyPredicate MutableStateConsistencyPredicate, namespaceID string, workflowID string, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { runID, err := c.getCurrentRunID( ctx, @@ -260,6 +268,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( shardOwnershipAsserted, consistencyPredicate, definition.NewWorkflowKey(namespaceID, workflowID, runID), + lockPriority, ) if err != nil { return nil, err diff --git a/service/history/api/consistency_checker_test.go b/service/history/api/consistency_checker_test.go index 62862d9a237e..7b288b678fe6 100644 --- a/service/history/api/consistency_checker_test.go +++ b/service/history/api/consistency_checker_test.go @@ -115,7 +115,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil) @@ -124,6 +124,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, BypassMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.NoError(err) s.Equal(mutableState, workflowContext.GetMutableState()) @@ -147,7 +148,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) gomock.InOrder( wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil), @@ -160,6 +161,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.NoError(err) s.Equal(mutableState2, workflowContext.GetMutableState()) @@ -181,7 +183,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound("")) @@ -192,6 +194,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&serviceerror.NotFound{}, err) s.Nil(workflowContext) @@ -213,7 +216,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound("")) @@ -224,6 +227,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&persistence.ShardOwnershipLostError{}, err) s.Nil(workflowContext) @@ -245,7 +249,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable("")) @@ -254,6 +258,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&serviceerror.Unavailable{}, err) s.Nil(workflowContext) diff --git a/service/history/api/deleteworkflow/api.go b/service/history/api/deleteworkflow/api.go index be84bcbf4ae9..368bd2696057 100644 --- a/service/history/api/deleteworkflow/api.go +++ b/service/history/api/deleteworkflow/api.go @@ -55,6 +55,7 @@ func Invoke( request.WorkflowExecution.WorkflowId, request.WorkflowExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return nil, err diff --git a/service/history/api/describemutablestate/api.go b/service/history/api/describemutablestate/api.go index b7fdda2b4a3c..1549f0a80d36 100644 --- a/service/history/api/describemutablestate/api.go +++ b/service/history/api/describemutablestate/api.go @@ -56,6 +56,7 @@ func Invoke( req.Execution.WorkflowId, req.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index 9c0becf0fbbd..e3f973b8671b 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -68,6 +68,7 @@ func Invoke( req.Request.Execution.WorkflowId, req.Request.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/get_workflow_util.go b/service/history/api/get_workflow_util.go index 8e0c440544e7..60425daa8ef6 100644 --- a/service/history/api/get_workflow_util.go +++ b/service/history/api/get_workflow_util.go @@ -162,6 +162,7 @@ func GetMutableState( nil, BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 66ce2bf10392..ef6d932f2f24 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -86,6 +86,7 @@ func Invoke( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/refreshworkflow/api.go b/service/history/api/refreshworkflow/api.go index 35e8cd61bbc8..f21cb33b2d43 100644 --- a/service/history/api/refreshworkflow/api.go +++ b/service/history/api/refreshworkflow/api.go @@ -51,6 +51,7 @@ func Invoke( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityLow, ) if err != nil { return err diff --git a/service/history/api/replication/generate_task.go b/service/history/api/replication/generate_task.go index be35b0870e0b..20ab90b23f9c 100644 --- a/service/history/api/replication/generate_task.go +++ b/service/history/api/replication/generate_task.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" ) func GenerateTask( @@ -57,6 +58,7 @@ func GenerateTask( request.Execution.WorkflowId, request.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/resetworkflow/api.go b/service/history/api/resetworkflow/api.go index a13c5e79ef2c..e15c2a4fa59a 100644 --- a/service/history/api/resetworkflow/api.go +++ b/service/history/api/resetworkflow/api.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/ndc" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -66,6 +67,7 @@ func Invoke( workflowID, baseRunID, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err @@ -104,6 +106,7 @@ func Invoke( workflowID, currentRunID, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/signalwithstartworkflow/api.go b/service/history/api/signalwithstartworkflow/api.go index fc90a1469859..8ba98bd5cf6f 100644 --- a/service/history/api/signalwithstartworkflow/api.go +++ b/service/history/api/signalwithstartworkflow/api.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -59,6 +60,7 @@ func Invoke( signalWithStartRequest.SignalWithStartRequest.WorkflowId, "", ), + workflow.LockPriorityHigh, ) switch err.(type) { case nil: diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index f5f1e7006ec9..a5ed7aaceaff 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -419,7 +419,7 @@ func (s *Starter) getMutableStateInfo(ctx context.Context, runID string) (*mutab ctx, s.namespace.ID(), commonpb.WorkflowExecution{WorkflowId: s.request.StartRequest.WorkflowId, RunId: runID}, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/update_workflow_util.go b/service/history/api/update_workflow_util.go index 16832b4bbae6..74d5bf8f01f5 100644 --- a/service/history/api/update_workflow_util.go +++ b/service/history/api/update_workflow_util.go @@ -49,6 +49,7 @@ func GetAndUpdateWorkflowWithNew( reqClock, consistencyCheckFn, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return err diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index f065714d19fb..234f97ed3166 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -60,6 +60,7 @@ func Invoke( req.Request.WorkflowExecution.WorkflowId, req.Request.WorkflowExecution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/verifychildworkflowcompletionrecorded/api.go b/service/history/api/verifychildworkflowcompletionrecorded/api.go index 753b4a58b6fc..5fc8ec88fdcc 100644 --- a/service/history/api/verifychildworkflowcompletionrecorded/api.go +++ b/service/history/api/verifychildworkflowcompletionrecorded/api.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -61,6 +62,7 @@ func Invoke( request.ParentExecution.WorkflowId, request.ParentExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return nil, err diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 2bf20fe1aa8c..d1940c7e8680 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -466,7 +466,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedSuccess() { metrics.AddMetricsContext(context.Background()), tests.NamespaceID, workflowExecution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -1946,7 +1946,7 @@ func (s *engine2Suite) getMutableState(namespaceID namespace.ID, we commonpb.Wor metrics.AddMetricsContext(context.Background()), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index ce6bbc8d1320..7120757b8a43 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -653,7 +653,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { context.Background(), tests.NamespaceID, execution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -4365,7 +4365,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith context.Background(), tests.NamespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -5230,7 +5230,7 @@ func (s *engineSuite) getMutableState(testNamespaceID namespace.ID, we commonpb. context.Background(), tests.NamespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil diff --git a/service/history/ndc/activity_replicator.go b/service/history/ndc/activity_replicator.go index 5b6de6238462..c32ba6b6821a 100644 --- a/service/history/ndc/activity_replicator.go +++ b/service/history/ndc/activity_replicator.go @@ -105,7 +105,7 @@ func (r *ActivityReplicatorImpl) SyncActivity( ctx, namespaceID, execution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { // for get workflow execution context, with valid run id diff --git a/service/history/ndc/activity_replicator_test.go b/service/history/ndc/activity_replicator_test.go index 6a2d0f89e447..5c43ca32d31c 100644 --- a/service/history/ndc/activity_replicator_test.go +++ b/service/history/ndc/activity_replicator_test.go @@ -642,8 +642,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_WorkflowClosed() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -716,8 +716,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityNotFound() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -791,8 +791,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -884,8 +884,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_NonZombie() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 03ef2892fb19..ad4c3c71cca3 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -247,7 +247,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( WorkflowId: wid, RunId: rid, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err @@ -353,7 +353,7 @@ func (r *HistoryReplicatorImpl) applyEvents( ctx, task.getNamespaceID(), *task.getExecution(), - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { // for get workflow execution context, with valid run id diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index 08fe9879996f..7f0461cb1dbc 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -177,7 +177,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), namespace.ID(namespaceID), we, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), @@ -289,7 +289,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { gomock.Any(), namespace.ID(namespaceID), we, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index c2a86d87ce9b..3fae560f392f 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -444,7 +444,7 @@ func (r *transactionMgrImpl) loadWorkflow( WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index 0a9631490c36..f789191c30e5 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -631,7 +631,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents( WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return 0, nil, err diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index e87c455fd8e1..4dfd75812843 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -699,8 +699,8 @@ func (s *workflowResetterSuite) TestReapplyContinueAsNewWorkflowEvents_WithConti }, nil) resetContext := workflow.NewMockContext(s.controller) - resetContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - resetContext.EXPECT().Unlock(workflow.CallerTypeAPI) + resetContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + resetContext.EXPECT().Unlock(workflow.LockPriorityHigh) resetMutableState := workflow.NewMockMutableState(s.controller) resetContext.EXPECT().LoadMutableState(gomock.Any()).Return(resetMutableState, nil) resetMutableState.EXPECT().GetNextEventID().Return(newNextEventID).AnyTimes() diff --git a/service/history/replication/ack_manager.go b/service/history/replication/ack_manager.go index dc8a8d2264bb..8321406eb955 100644 --- a/service/history/replication/ack_manager.go +++ b/service/history/replication/ack_manager.go @@ -610,7 +610,7 @@ func (p *ackMgrImpl) processReplication( ctx, namespaceID, execution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return nil, err @@ -659,7 +659,7 @@ func (p *ackMgrImpl) processNewRunReplication( WorkflowId: workflowID, RunId: newRunID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return nil, err diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 46d870b7aaf8..00181b06d323 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -363,7 +363,7 @@ func (e *taskExecutorImpl) cleanupWorkflowExecution(ctx context.Context, namespa WorkflowId: workflowID, RunId: runID, } - wfCtx, releaseFn, err := e.workflowCache.GetOrCreateWorkflowExecution(ctx, nsID, ex, workflow.CallerTypeTask) + wfCtx, releaseFn, err := e.workflowCache.GetOrCreateWorkflowExecution(ctx, nsID, ex, workflow.LockPriorityLow) if err != nil { return err } diff --git a/service/history/timerQueueTaskExecutorBase.go b/service/history/timerQueueTaskExecutorBase.go index fa32cf064dab..d1e2c95e4f99 100644 --- a/service/history/timerQueueTaskExecutorBase.go +++ b/service/history/timerQueueTaskExecutorBase.go @@ -39,7 +39,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -100,7 +100,7 @@ func (t *timerQueueTaskExecutorBase) executeDeleteHistoryEventTask( ctx, namespace.ID(task.GetNamespaceID()), workflowExecution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err @@ -177,7 +177,7 @@ func getWorkflowExecutionContext( ctx, namespaceID, execution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if common.IsContextDeadlineExceededErr(err) { err = consts.ErrWorkflowBusy diff --git a/service/history/timerQueueTaskExecutorBase_test.go b/service/history/timerQueueTaskExecutorBase_test.go index 55899eeefaed..6813805780f8 100644 --- a/service/history/timerQueueTaskExecutorBase_test.go +++ b/service/history/timerQueueTaskExecutorBase_test.go @@ -42,7 +42,7 @@ import ( "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -130,7 +130,7 @@ func (s *timerQueueTaskExecutorBaseSuite) Test_executeDeleteHistoryEventTask_NoE mockWeCtx := workflow.NewMockContext(s.controller) mockMutableState := workflow.NewMockMutableState(s.controller) - s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.CallerTypeTask).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.LockPriorityLow).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil) mockMutableState.EXPECT().GetLastWriteVersion().Return(int64(1), nil) @@ -182,7 +182,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TestArchiveHistory_DeleteFailed() { mockWeCtx := workflow.NewMockContext(s.controller) mockMutableState := workflow.NewMockMutableState(s.controller) - s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.CallerTypeTask).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.LockPriorityLow).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil) mockMutableState.EXPECT().GetLastWriteVersion().Return(int64(1), nil) diff --git a/service/history/transferQueueTaskExecutorBase.go b/service/history/transferQueueTaskExecutorBase.go index c56ef7f58cce..18a86c474c0b 100644 --- a/service/history/transferQueueTaskExecutorBase.go +++ b/service/history/transferQueueTaskExecutorBase.go @@ -45,7 +45,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -256,7 +256,7 @@ func (t *transferQueueTaskExecutorBase) deleteExecution( ctx, namespace.ID(task.GetNamespaceID()), workflowExecution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index bb8e07be39d8..ce19d016ec4d 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -35,6 +35,7 @@ import ( "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/cache" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" @@ -59,7 +60,7 @@ type ( ctx context.Context, namespaceID namespace.ID, execution commonpb.WorkflowExecution, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) } @@ -101,7 +102,7 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution( ctx context.Context, namespaceID namespace.ID, execution commonpb.WorkflowExecution, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) { if err := c.validateWorkflowExecutionInfo(ctx, namespaceID, &execution); err != nil { @@ -119,7 +120,7 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution( execution, handler, false, - caller, + lockPriority, ) metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), time.Since(start).Nanoseconds()) @@ -133,7 +134,7 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( execution commonpb.WorkflowExecution, handler metrics.Handler, forceClearContext bool, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) { key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) @@ -152,9 +153,9 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // TODO This will create a closure on every request. // Consider revisiting this if it causes too much GC activity - releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, caller) + releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - if err := workflowCtx.Lock(ctx, caller); err != nil { + if err := workflowCtx.Lock(ctx, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) @@ -168,7 +169,7 @@ func (c *CacheImpl) makeReleaseFunc( key definition.WorkflowKey, context workflow.Context, forceClearContext bool, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) func(error) { status := cacheNotReleased @@ -176,7 +177,7 @@ func (c *CacheImpl) makeReleaseFunc( if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) { if rec := recover(); rec != nil { context.Clear() - context.Unlock(caller) + context.Unlock(lockPriority) c.Release(key) panic(rec) } else { @@ -184,7 +185,7 @@ func (c *CacheImpl) makeReleaseFunc( // TODO see issue #668, there are certain type or errors which can bypass the clear context.Clear() } - context.Unlock(caller) + context.Unlock(lockPriority) c.Release(key) } } diff --git a/service/history/workflow/cache/cache_mock.go b/service/history/workflow/cache/cache_mock.go index 523ae1554700..69bcfd8a3d65 100644 --- a/service/history/workflow/cache/cache_mock.go +++ b/service/history/workflow/cache/cache_mock.go @@ -62,9 +62,9 @@ func (m *MockCache) EXPECT() *MockCacheMockRecorder { } // GetOrCreateWorkflowExecution mocks base method. -func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceID namespace.ID, execution v1.WorkflowExecution, caller workflow.CallerType) (workflow.Context, ReleaseCacheFunc, error) { +func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceID namespace.ID, execution v1.WorkflowExecution, lockPriority workflow.LockPriority) (workflow.Context, ReleaseCacheFunc, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecution", ctx, namespaceID, execution, caller) + ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecution", ctx, namespaceID, execution, lockPriority) ret0, _ := ret[0].(workflow.Context) ret1, _ := ret[1].(ReleaseCacheFunc) ret2, _ := ret[2].(error) @@ -72,7 +72,7 @@ func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceI } // GetOrCreateWorkflowExecution indicates an expected call of GetOrCreateWorkflowExecution. -func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecution(ctx, namespaceID, execution, caller interface{}) *gomock.Call { +func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecution(ctx, namespaceID, execution, lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecution), ctx, namespaceID, execution, caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecution), ctx, namespaceID, execution, lockPriority) } diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index 78cbc44b5c4e..5f9065bde729 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -106,7 +106,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution1, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) ctx.(*workflow.ContextImpl).MutableState = mockMS1 @@ -115,7 +115,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution1, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.Equal(mockMS1, ctx.(*workflow.ContextImpl).MutableState) @@ -129,7 +129,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.NotEqual(mockMS1, ctx.(*workflow.ContextImpl).MutableState) @@ -149,7 +149,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) @@ -163,7 +163,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NotNil(err2) @@ -174,7 +174,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err3) release2(err3) @@ -184,7 +184,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err4) s.False(ctx == newContext) @@ -204,7 +204,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since we are just testing whether the release function will clear the cache @@ -220,7 +220,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) @@ -235,7 +235,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.Nil(ctx.(*workflow.ContextImpl).MutableState) @@ -270,7 +270,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Release() { WorkflowId: workflowId, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since each time the is reset to nil @@ -296,7 +296,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Release() { WorkflowId: workflowId, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since we are just testing whether the release function will clear the cache @@ -343,7 +343,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Pin() { WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err == nil { break diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index a9d39f82a624..99dc897ee778 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -61,12 +61,12 @@ const ( ) const ( - CallerTypeAPI CallerType = 0 - CallerTypeTask CallerType = 1 + LockPriorityHigh LockPriority = 0 + LockPriorityLow LockPriority = 1 ) type ( - CallerType int + LockPriority int Context interface { GetWorkflowKey() definition.WorkflowKey @@ -75,8 +75,8 @@ type ( LoadExecutionStats(ctx context.Context) (*persistencespb.ExecutionStats, error) Clear() - Lock(ctx context.Context, caller CallerType) error - Unlock(caller CallerType) + Lock(ctx context.Context, lockPriority LockPriority) error + Unlock(lockPriority LockPriority) GetHistorySize() int64 SetHistorySize(size int64) @@ -189,28 +189,28 @@ func NewContext( func (c *ContextImpl) Lock( ctx context.Context, - caller CallerType, + lockPriority LockPriority, ) error { - switch caller { - case CallerTypeAPI: + switch lockPriority { + case LockPriorityHigh: return c.mutex.LockHigh(ctx) - case CallerTypeTask: + case LockPriorityLow: return c.mutex.LockLow(ctx) default: - panic(fmt.Sprintf("unknown caller type: %v", caller)) + panic(fmt.Sprintf("unknown lock priority: %v", lockPriority)) } } func (c *ContextImpl) Unlock( - caller CallerType, + lockPriority LockPriority, ) { - switch caller { - case CallerTypeAPI: + switch lockPriority { + case LockPriorityHigh: c.mutex.UnlockHigh() - case CallerTypeTask: + case LockPriorityLow: c.mutex.UnlockLow() default: - panic(fmt.Sprintf("unknown caller type: %v", caller)) + panic(fmt.Sprintf("unknown lock priority: %v", lockPriority)) } } diff --git a/service/history/workflow/context_mock.go b/service/history/workflow/context_mock.go index 6ce51318a96b..f09d567c646f 100644 --- a/service/history/workflow/context_mock.go +++ b/service/history/workflow/context_mock.go @@ -161,17 +161,17 @@ func (mr *MockContextMockRecorder) LoadMutableState(ctx interface{}) *gomock.Cal } // Lock mocks base method. -func (m *MockContext) Lock(ctx context.Context, caller CallerType) error { +func (m *MockContext) Lock(ctx context.Context, lockPriority LockPriority) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lock", ctx, caller) + ret := m.ctrl.Call(m, "Lock", ctx, lockPriority) ret0, _ := ret[0].(error) return ret0 } // Lock indicates an expected call of Lock. -func (mr *MockContextMockRecorder) Lock(ctx, caller interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) Lock(ctx, lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockContext)(nil).Lock), ctx, caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockContext)(nil).Lock), ctx, lockPriority) } // PersistWorkflowEvents mocks base method. @@ -230,15 +230,15 @@ func (mr *MockContextMockRecorder) SetWorkflowExecution(ctx, now interface{}) *g } // Unlock mocks base method. -func (m *MockContext) Unlock(caller CallerType) { +func (m *MockContext) Unlock(lockPriority LockPriority) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Unlock", caller) + m.ctrl.Call(m, "Unlock", lockPriority) } // Unlock indicates an expected call of Unlock. -func (mr *MockContextMockRecorder) Unlock(caller interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) Unlock(lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockContext)(nil).Unlock), caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockContext)(nil).Unlock), lockPriority) } // UpdateWorkflowExecutionAsActive mocks base method. diff --git a/service/history/workflowRebuilder.go b/service/history/workflowRebuilder.go index 892b27dab564..07641fd02baa 100644 --- a/service/history/workflowRebuilder.go +++ b/service/history/workflowRebuilder.go @@ -85,6 +85,7 @@ func (r *workflowRebuilderImpl) rebuild( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return err diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index fb048eed51e8..8f57f5cfdb07 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -364,6 +364,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( token.WorkflowId, token.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err @@ -733,6 +734,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule req.WorkflowExecution.WorkflowId, req.WorkflowExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return err