diff --git a/service/history/api/consistency_checker.go b/service/history/api/consistency_checker.go index 18080e5c218..5fa3f600572 100644 --- a/service/history/api/consistency_checker.go +++ b/service/history/api/consistency_checker.go @@ -59,6 +59,7 @@ type ( reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) } @@ -108,6 +109,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { if reqClock != nil { currentClock := c.shardContext.CurrentVectorClock() @@ -117,6 +119,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( reqClock, currentClock, workflowKey, + lockPriority, ) } // request vector clock cannot is not comparable with current shard vector clock @@ -133,6 +136,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( &shardOwnershipAsserted, consistencyPredicate, workflowKey, + lockPriority, ) } return c.getCurrentWorkflowContext( @@ -141,6 +145,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext( consistencyPredicate, workflowKey.NamespaceID, workflowKey.WorkflowID, + lockPriority, ) } @@ -149,6 +154,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( reqClock *clockspb.VectorClock, currentClock *clockspb.VectorClock, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { cmpResult, err := vclock.Compare(reqClock, currentClock) if err != nil { @@ -170,7 +176,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeAPI, + lockPriority, ) if err != nil { return nil, err @@ -189,6 +195,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck( shardOwnershipAsserted *bool, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { if len(workflowKey.RunID) == 0 { return nil, serviceerror.NewInternal(fmt.Sprintf( @@ -203,7 +210,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeAPI, + lockPriority, ) if err != nil { return nil, err @@ -245,6 +252,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( consistencyPredicate MutableStateConsistencyPredicate, namespaceID string, workflowID string, + lockPriority workflow.LockPriority, ) (WorkflowContext, error) { runID, err := c.getCurrentRunID( ctx, @@ -260,6 +268,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( shardOwnershipAsserted, consistencyPredicate, definition.NewWorkflowKey(namespaceID, workflowID, runID), + lockPriority, ) if err != nil { return nil, err diff --git a/service/history/api/consistency_checker_test.go b/service/history/api/consistency_checker_test.go index 62862d9a237..7b288b678fe 100644 --- a/service/history/api/consistency_checker_test.go +++ b/service/history/api/consistency_checker_test.go @@ -115,7 +115,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil) @@ -124,6 +124,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, BypassMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.NoError(err) s.Equal(mutableState, workflowContext.GetMutableState()) @@ -147,7 +148,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) gomock.InOrder( wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil), @@ -160,6 +161,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.NoError(err) s.Equal(mutableState2, workflowContext.GetMutableState()) @@ -181,7 +183,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound("")) @@ -192,6 +194,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&serviceerror.NotFound{}, err) s.Nil(workflowContext) @@ -213,7 +216,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound("")) @@ -224,6 +227,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&persistence.ShardOwnershipLostError{}, err) s.Nil(workflowContext) @@ -245,7 +249,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck WorkflowId: s.workflowID, RunId: s.currentRunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable("")) @@ -254,6 +258,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID), + workflow.LockPriorityHigh, ) s.IsType(&serviceerror.Unavailable{}, err) s.Nil(workflowContext) diff --git a/service/history/api/deleteworkflow/api.go b/service/history/api/deleteworkflow/api.go index be84bcbf4ae..368bd269605 100644 --- a/service/history/api/deleteworkflow/api.go +++ b/service/history/api/deleteworkflow/api.go @@ -55,6 +55,7 @@ func Invoke( request.WorkflowExecution.WorkflowId, request.WorkflowExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return nil, err diff --git a/service/history/api/describemutablestate/api.go b/service/history/api/describemutablestate/api.go index b7fdda2b4a3..1549f0a80d3 100644 --- a/service/history/api/describemutablestate/api.go +++ b/service/history/api/describemutablestate/api.go @@ -56,6 +56,7 @@ func Invoke( req.Execution.WorkflowId, req.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index aa36850c398..46bcb364bf4 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -68,6 +68,7 @@ func Invoke( req.Request.Execution.WorkflowId, req.Request.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/get_workflow_util.go b/service/history/api/get_workflow_util.go index 0020b7b50de..c21fe5b77ff 100644 --- a/service/history/api/get_workflow_util.go +++ b/service/history/api/get_workflow_util.go @@ -162,6 +162,7 @@ func GetMutableState( nil, BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 7e75e98e04f..5dec27bd5fd 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -86,6 +86,7 @@ func Invoke( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/refreshworkflow/api.go b/service/history/api/refreshworkflow/api.go index 35e8cd61bbc..f21cb33b2d4 100644 --- a/service/history/api/refreshworkflow/api.go +++ b/service/history/api/refreshworkflow/api.go @@ -51,6 +51,7 @@ func Invoke( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityLow, ) if err != nil { return err diff --git a/service/history/api/replication/generate_task.go b/service/history/api/replication/generate_task.go index be35b0870e0..20ab90b23f9 100644 --- a/service/history/api/replication/generate_task.go +++ b/service/history/api/replication/generate_task.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" ) func GenerateTask( @@ -57,6 +58,7 @@ func GenerateTask( request.Execution.WorkflowId, request.Execution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/resetworkflow/api.go b/service/history/api/resetworkflow/api.go index a13c5e79ef2..e15c2a4fa59 100644 --- a/service/history/api/resetworkflow/api.go +++ b/service/history/api/resetworkflow/api.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/ndc" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -66,6 +67,7 @@ func Invoke( workflowID, baseRunID, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err @@ -104,6 +106,7 @@ func Invoke( workflowID, currentRunID, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/signalwithstartworkflow/api.go b/service/history/api/signalwithstartworkflow/api.go index fc90a146985..8ba98bd5cf6 100644 --- a/service/history/api/signalwithstartworkflow/api.go +++ b/service/history/api/signalwithstartworkflow/api.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -59,6 +60,7 @@ func Invoke( signalWithStartRequest.SignalWithStartRequest.WorkflowId, "", ), + workflow.LockPriorityHigh, ) switch err.(type) { case nil: diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index ffaccfa8dbb..1ffe653ba76 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -194,7 +194,7 @@ func (s *Starter) lockCurrentWorkflowExecution( ctx, s.namespace.ID(), s.request.StartRequest.WorkflowId, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil, err @@ -437,7 +437,7 @@ func (s *Starter) getMutableStateInfo(ctx context.Context, runID string) (*mutab ctx, s.namespace.ID(), commonpb.WorkflowExecution{WorkflowId: s.request.StartRequest.WorkflowId, RunId: runID}, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/update_workflow_util.go b/service/history/api/update_workflow_util.go index 13385b450f0..6370c2efd51 100644 --- a/service/history/api/update_workflow_util.go +++ b/service/history/api/update_workflow_util.go @@ -49,6 +49,7 @@ func GetAndUpdateWorkflowWithNew( reqClock, consistencyCheckFn, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return err diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index c5b1fee5c69..ead865e677b 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -60,6 +60,7 @@ func Invoke( req.Request.WorkflowExecution.WorkflowId, req.Request.WorkflowExecution.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/api/verifychildworkflowcompletionrecorded/api.go b/service/history/api/verifychildworkflowcompletionrecorded/api.go index 753b4a58b6f..5fc8ec88fdc 100644 --- a/service/history/api/verifychildworkflowcompletionrecorded/api.go +++ b/service/history/api/verifychildworkflowcompletionrecorded/api.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" ) func Invoke( @@ -61,6 +62,7 @@ func Invoke( request.ParentExecution.WorkflowId, request.ParentExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return nil, err diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index ebf08e3485c..c087c6e5df5 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -466,7 +466,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedSuccess() { metrics.AddMetricsContext(context.Background()), tests.NamespaceID, workflowExecution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -1946,7 +1946,7 @@ func (s *engine2Suite) getMutableState(namespaceID namespace.ID, we commonpb.Wor metrics.AddMetricsContext(context.Background()), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 59e9b569db9..22d95160015 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -653,7 +653,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { context.Background(), tests.NamespaceID, execution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -4365,7 +4365,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith context.Background(), tests.NamespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) loadedMS, err := ctx.LoadMutableState(context.Background()) @@ -5230,7 +5230,7 @@ func (s *engineSuite) getMutableState(testNamespaceID namespace.ID, we commonpb. context.Background(), tests.NamespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil diff --git a/service/history/ndc/activity_replicator.go b/service/history/ndc/activity_replicator.go index f099497de70..55751bd9431 100644 --- a/service/history/ndc/activity_replicator.go +++ b/service/history/ndc/activity_replicator.go @@ -106,7 +106,7 @@ func (r *ActivityReplicatorImpl) SyncActivity( ctx, namespaceID, execution, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { // for get workflow execution context, with valid run id diff --git a/service/history/ndc/activity_replicator_test.go b/service/history/ndc/activity_replicator_test.go index 88a5afbcdae..3cf72a3d2da 100644 --- a/service/history/ndc/activity_replicator_test.go +++ b/service/history/ndc/activity_replicator_test.go @@ -642,8 +642,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_WorkflowClosed() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -716,8 +716,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityNotFound() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -791,8 +791,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) @@ -883,8 +883,8 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_NonZombie() { key := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) weContext := workflow.NewMockContext(s.controller) weContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mockMutableState, nil) - weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + weContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + weContext.EXPECT().Unlock(workflow.LockPriorityHigh) _, err := s.workflowCache.PutIfNotExist(key, weContext) s.NoError(err) diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index a7b772e4e44..d0c422de06b 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -248,7 +248,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( WorkflowId: wid, RunId: rid, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err @@ -357,7 +357,7 @@ func (r *HistoryReplicatorImpl) applyEvents( ctx, task.getNamespaceID(), *task.getExecution(), - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { // for get workflow execution context, with valid run id diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index 5633b1e9d8f..eb2147aff9c 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -177,7 +177,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), namespace.ID(namespaceID), we, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), @@ -279,7 +279,7 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { gomock.Any(), namespace.ID(namespaceID), we, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().CreateWorkflowExecution( gomock.Any(), diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index 07bed65cf47..cce2b2a5d04 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -444,7 +444,7 @@ func (r *transactionMgrImpl) loadWorkflow( WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return nil, err diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index 5d58f980558..05cec5f59cc 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -634,7 +634,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents( WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err != nil { return 0, nil, err diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index ad33adeb95e..338344a1e27 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -703,8 +703,8 @@ func (s *workflowResetterSuite) TestReapplyContinueAsNewWorkflowEvents_WithConti }, nil) resetContext := workflow.NewMockContext(s.controller) - resetContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) - resetContext.EXPECT().Unlock(workflow.CallerTypeAPI) + resetContext.EXPECT().Lock(gomock.Any(), workflow.LockPriorityHigh).Return(nil) + resetContext.EXPECT().Unlock(workflow.LockPriorityHigh) resetMutableState := workflow.NewMockMutableState(s.controller) resetContext.EXPECT().LoadMutableState(gomock.Any()).Return(resetMutableState, nil) resetMutableState.EXPECT().GetNextEventID().Return(newNextEventID).AnyTimes() diff --git a/service/history/replication/raw_task_converter.go b/service/history/replication/raw_task_converter.go index de28b4e00a4..5a2647cc32a 100644 --- a/service/history/replication/raw_task_converter.go +++ b/service/history/replication/raw_task_converter.go @@ -240,7 +240,7 @@ func generateStateReplicationTask( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return nil, err @@ -303,7 +303,7 @@ func getBranchToken( WorkflowId: workflowKey.WorkflowID, RunId: workflowKey.RunID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return nil, nil, nil, err diff --git a/service/history/replication/raw_task_converter_test.go b/service/history/replication/raw_task_converter_test.go index c69a9fcb281..971ab60f7d1 100644 --- a/service/history/replication/raw_task_converter_test.go +++ b/service/history/replication/raw_task_converter_test.go @@ -158,7 +158,7 @@ func (s *rawTaskConverterSuite) TestConvertActivityStateReplicationTask_Workflow WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("")) @@ -191,7 +191,7 @@ func (s *rawTaskConverterSuite) TestConvertActivityStateReplicationTask_Workflow WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() @@ -225,7 +225,7 @@ func (s *rawTaskConverterSuite) TestConvertActivityStateReplicationTask_Activity WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() @@ -260,7 +260,7 @@ func (s *rawTaskConverterSuite) TestConvertActivityStateReplicationTask_Activity WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) activityVersion := version @@ -362,7 +362,7 @@ func (s *rawTaskConverterSuite) TestConvertActivityStateReplicationTask_Activity WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) activityVersion := version @@ -464,7 +464,7 @@ func (s *rawTaskConverterSuite) TestConvertWorkflowStateReplicationTask_Workflow WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enums.WORKFLOW_EXECUTION_STATUS_RUNNING).AnyTimes() @@ -496,7 +496,7 @@ func (s *rawTaskConverterSuite) TestConvertWorkflowStateReplicationTask_Workflow WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().CloneToProto().Return(&persistencespb.WorkflowMutableState{ @@ -561,7 +561,7 @@ func (s *rawTaskConverterSuite) TestConvertHistoryReplicationTask_WorkflowMissin WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, serviceerror.NewNotFound("")) @@ -622,7 +622,7 @@ func (s *rawTaskConverterSuite) TestConvertHistoryReplicationTask_WithNewRun() { WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -667,7 +667,7 @@ func (s *rawTaskConverterSuite) TestConvertHistoryReplicationTask_WithNewRun() { WorkflowId: s.workflowID, RunId: s.newRunID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.newWorkflowContext, s.releaseFn, nil) s.newWorkflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.newMutableState, nil) s.newMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -758,7 +758,7 @@ func (s *rawTaskConverterSuite) TestConvertHistoryReplicationTask_WithoutNewRun( WorkflowId: s.workflowID, RunId: s.runID, }, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(s.mutableState, nil) s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index bb851479c7b..611b1beebca 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -363,7 +363,7 @@ func (e *taskExecutorImpl) cleanupWorkflowExecution(ctx context.Context, namespa WorkflowId: workflowID, RunId: runID, } - wfCtx, releaseFn, err := e.workflowCache.GetOrCreateWorkflowExecution(ctx, nsID, ex, workflow.CallerTypeTask) + wfCtx, releaseFn, err := e.workflowCache.GetOrCreateWorkflowExecution(ctx, nsID, ex, workflow.LockPriorityLow) if err != nil { return err } diff --git a/service/history/timerQueueTaskExecutorBase.go b/service/history/timerQueueTaskExecutorBase.go index fa32cf064da..d1e2c95e4f9 100644 --- a/service/history/timerQueueTaskExecutorBase.go +++ b/service/history/timerQueueTaskExecutorBase.go @@ -39,7 +39,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -100,7 +100,7 @@ func (t *timerQueueTaskExecutorBase) executeDeleteHistoryEventTask( ctx, namespace.ID(task.GetNamespaceID()), workflowExecution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err @@ -177,7 +177,7 @@ func getWorkflowExecutionContext( ctx, namespaceID, execution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if common.IsContextDeadlineExceededErr(err) { err = consts.ErrWorkflowBusy diff --git a/service/history/timerQueueTaskExecutorBase_test.go b/service/history/timerQueueTaskExecutorBase_test.go index 55899eeefae..6813805780f 100644 --- a/service/history/timerQueueTaskExecutorBase_test.go +++ b/service/history/timerQueueTaskExecutorBase_test.go @@ -42,7 +42,7 @@ import ( "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -130,7 +130,7 @@ func (s *timerQueueTaskExecutorBaseSuite) Test_executeDeleteHistoryEventTask_NoE mockWeCtx := workflow.NewMockContext(s.controller) mockMutableState := workflow.NewMockMutableState(s.controller) - s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.CallerTypeTask).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.LockPriorityLow).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil) mockMutableState.EXPECT().GetLastWriteVersion().Return(int64(1), nil) @@ -182,7 +182,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TestArchiveHistory_DeleteFailed() { mockWeCtx := workflow.NewMockContext(s.controller) mockMutableState := workflow.NewMockMutableState(s.controller) - s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.CallerTypeTask).Return(mockWeCtx, wcache.NoopReleaseFn, nil) + s.mockCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), tests.NamespaceID, we, workflow.LockPriorityLow).Return(mockWeCtx, wcache.NoopReleaseFn, nil) mockWeCtx.EXPECT().LoadMutableState(gomock.Any()).Return(mockMutableState, nil) mockMutableState.EXPECT().GetLastWriteVersion().Return(int64(1), nil) diff --git a/service/history/transferQueueTaskExecutorBase.go b/service/history/transferQueueTaskExecutorBase.go index c56ef7f58cc..18a86c474c0 100644 --- a/service/history/transferQueueTaskExecutorBase.go +++ b/service/history/transferQueueTaskExecutorBase.go @@ -45,7 +45,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -256,7 +256,7 @@ func (t *transferQueueTaskExecutorBase) deleteExecution( ctx, namespace.ID(task.GetNamespaceID()), workflowExecution, - workflow.CallerTypeTask, + workflow.LockPriorityLow, ) if err != nil { return err diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index ad07adf908d..e5a2fe9cf65 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -35,6 +35,7 @@ import ( "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/cache" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" @@ -59,14 +60,14 @@ type ( ctx context.Context, namespaceID namespace.ID, workflowID string, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) GetOrCreateWorkflowExecution( ctx context.Context, namespaceID namespace.ID, execution commonpb.WorkflowExecution, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) } @@ -108,7 +109,7 @@ func (c *CacheImpl) GetOrCreateCurrentWorkflowExecution( ctx context.Context, namespaceID namespace.ID, workflowID string, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) { if err := c.validateWorkflowID(workflowID); err != nil { @@ -132,7 +133,7 @@ func (c *CacheImpl) GetOrCreateCurrentWorkflowExecution( execution, handler, true, - caller, + lockPriority, ) metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), time.Since(start).Nanoseconds()) @@ -144,7 +145,7 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution( ctx context.Context, namespaceID namespace.ID, execution commonpb.WorkflowExecution, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) { if err := c.validateWorkflowExecutionInfo(ctx, namespaceID, &execution); err != nil { @@ -162,7 +163,7 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution( execution, handler, false, - caller, + lockPriority, ) metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), time.Since(start).Nanoseconds()) @@ -176,7 +177,7 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( execution commonpb.WorkflowExecution, handler metrics.Handler, forceClearContext bool, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) (workflow.Context, ReleaseCacheFunc, error) { key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) @@ -195,9 +196,9 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // TODO This will create a closure on every request. // Consider revisiting this if it causes too much GC activity - releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, caller) + releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - if err := workflowCtx.Lock(ctx, caller); err != nil { + if err := workflowCtx.Lock(ctx, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) @@ -211,7 +212,7 @@ func (c *CacheImpl) makeReleaseFunc( key definition.WorkflowKey, context workflow.Context, forceClearContext bool, - caller workflow.CallerType, + lockPriority workflow.LockPriority, ) func(error) { status := cacheNotReleased @@ -219,7 +220,7 @@ func (c *CacheImpl) makeReleaseFunc( if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) { if rec := recover(); rec != nil { context.Clear() - context.Unlock(caller) + context.Unlock(lockPriority) c.Release(key) panic(rec) } else { @@ -227,7 +228,7 @@ func (c *CacheImpl) makeReleaseFunc( // TODO see issue #668, there are certain type or errors which can bypass the clear context.Clear() } - context.Unlock(caller) + context.Unlock(lockPriority) c.Release(key) } } diff --git a/service/history/workflow/cache/cache_mock.go b/service/history/workflow/cache/cache_mock.go index 59233f30b6d..76c4378360f 100644 --- a/service/history/workflow/cache/cache_mock.go +++ b/service/history/workflow/cache/cache_mock.go @@ -62,9 +62,9 @@ func (m *MockCache) EXPECT() *MockCacheMockRecorder { } // GetOrCreateCurrentWorkflowExecution mocks base method. -func (m *MockCache) GetOrCreateCurrentWorkflowExecution(ctx context.Context, namespaceID namespace.ID, workflowID string, caller workflow.CallerType) (workflow.Context, ReleaseCacheFunc, error) { +func (m *MockCache) GetOrCreateCurrentWorkflowExecution(ctx context.Context, namespaceID namespace.ID, workflowID string, lockPriority workflow.LockPriority) (workflow.Context, ReleaseCacheFunc, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOrCreateCurrentWorkflowExecution", ctx, namespaceID, workflowID, caller) + ret := m.ctrl.Call(m, "GetOrCreateCurrentWorkflowExecution", ctx, namespaceID, workflowID, lockPriority) ret0, _ := ret[0].(workflow.Context) ret1, _ := ret[1].(ReleaseCacheFunc) ret2, _ := ret[2].(error) @@ -72,15 +72,15 @@ func (m *MockCache) GetOrCreateCurrentWorkflowExecution(ctx context.Context, nam } // GetOrCreateCurrentWorkflowExecution indicates an expected call of GetOrCreateCurrentWorkflowExecution. -func (mr *MockCacheMockRecorder) GetOrCreateCurrentWorkflowExecution(ctx, namespaceID, workflowID, caller interface{}) *gomock.Call { +func (mr *MockCacheMockRecorder) GetOrCreateCurrentWorkflowExecution(ctx, namespaceID, workflowID, lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateCurrentWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateCurrentWorkflowExecution), ctx, namespaceID, workflowID, caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateCurrentWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateCurrentWorkflowExecution), ctx, namespaceID, workflowID, lockPriority) } // GetOrCreateWorkflowExecution mocks base method. -func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceID namespace.ID, execution v1.WorkflowExecution, caller workflow.CallerType) (workflow.Context, ReleaseCacheFunc, error) { +func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceID namespace.ID, execution v1.WorkflowExecution, lockPriority workflow.LockPriority) (workflow.Context, ReleaseCacheFunc, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecution", ctx, namespaceID, execution, caller) + ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecution", ctx, namespaceID, execution, lockPriority) ret0, _ := ret[0].(workflow.Context) ret1, _ := ret[1].(ReleaseCacheFunc) ret2, _ := ret[2].(error) @@ -88,7 +88,7 @@ func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceI } // GetOrCreateWorkflowExecution indicates an expected call of GetOrCreateWorkflowExecution. -func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecution(ctx, namespaceID, execution, caller interface{}) *gomock.Call { +func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecution(ctx, namespaceID, execution, lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecution), ctx, namespaceID, execution, caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecution), ctx, namespaceID, execution, lockPriority) } diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index cb347d0e2d0..b9574ed1e26 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -106,7 +106,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution1, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) ctx.(*workflow.ContextImpl).MutableState = mockMS1 @@ -115,7 +115,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution1, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.Equal(mockMS1, ctx.(*workflow.ContextImpl).MutableState) @@ -129,7 +129,7 @@ func (s *workflowCacheSuite) TestHistoryCacheBasic() { context.Background(), namespaceID, execution2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.NotEqual(mockMS1, ctx.(*workflow.ContextImpl).MutableState) @@ -149,7 +149,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) @@ -163,7 +163,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NotNil(err2) @@ -174,7 +174,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we2, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err3) release2(err3) @@ -184,7 +184,7 @@ func (s *workflowCacheSuite) TestHistoryCachePinning() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err4) s.False(ctx == newContext) @@ -204,7 +204,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since we are just testing whether the release function will clear the cache @@ -220,7 +220,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) @@ -234,7 +234,7 @@ func (s *workflowCacheSuite) TestHistoryCacheClear() { context.Background(), namespaceID, we, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) s.Nil(ctx.(*workflow.ContextImpl).MutableState) @@ -269,7 +269,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Release() { WorkflowId: workflowId, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since each time the is reset to nil @@ -294,7 +294,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Release() { WorkflowId: workflowId, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) // since we are just testing whether the release function will clear the cache @@ -341,7 +341,7 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Pin() { WorkflowId: workflowID, RunId: runID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) if err == nil { break @@ -372,7 +372,7 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { ctx, tests.NamespaceID, tests.WorkflowID, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.NoError(err) defer currentRelease(nil) @@ -388,7 +388,7 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { WorkflowId: tests.WorkflowID, RunId: tests.RunID, }, - workflow.CallerTypeAPI, + workflow.LockPriorityHigh, ) s.Nil(err) defer release(nil) diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 0db0f12c05b..03bb14e1644 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -62,12 +62,12 @@ const ( ) const ( - CallerTypeAPI CallerType = 0 - CallerTypeTask CallerType = 1 + LockPriorityHigh LockPriority = 0 + LockPriorityLow LockPriority = 1 ) type ( - CallerType int + LockPriority int Context interface { GetWorkflowKey() definition.WorkflowKey @@ -76,8 +76,8 @@ type ( LoadExecutionStats(ctx context.Context) (*persistencespb.ExecutionStats, error) Clear() - Lock(ctx context.Context, caller CallerType) error - Unlock(caller CallerType) + Lock(ctx context.Context, lockPriority LockPriority) error + Unlock(lockPriority LockPriority) GetHistorySize() int64 SetHistorySize(size int64) @@ -186,28 +186,28 @@ func NewContext( func (c *ContextImpl) Lock( ctx context.Context, - caller CallerType, + lockPriority LockPriority, ) error { - switch caller { - case CallerTypeAPI: + switch lockPriority { + case LockPriorityHigh: return c.mutex.LockHigh(ctx) - case CallerTypeTask: + case LockPriorityLow: return c.mutex.LockLow(ctx) default: - panic(fmt.Sprintf("unknown caller type: %v", caller)) + panic(fmt.Sprintf("unknown lock priority: %v", lockPriority)) } } func (c *ContextImpl) Unlock( - caller CallerType, + lockPriority LockPriority, ) { - switch caller { - case CallerTypeAPI: + switch lockPriority { + case LockPriorityHigh: c.mutex.UnlockHigh() - case CallerTypeTask: + case LockPriorityLow: c.mutex.UnlockLow() default: - panic(fmt.Sprintf("unknown caller type: %v", caller)) + panic(fmt.Sprintf("unknown lock priority: %v", lockPriority)) } } diff --git a/service/history/workflow/context_mock.go b/service/history/workflow/context_mock.go index 9777556d77e..609279bc461 100644 --- a/service/history/workflow/context_mock.go +++ b/service/history/workflow/context_mock.go @@ -161,17 +161,17 @@ func (mr *MockContextMockRecorder) LoadMutableState(ctx interface{}) *gomock.Cal } // Lock mocks base method. -func (m *MockContext) Lock(ctx context.Context, caller CallerType) error { +func (m *MockContext) Lock(ctx context.Context, lockPriority LockPriority) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lock", ctx, caller) + ret := m.ctrl.Call(m, "Lock", ctx, lockPriority) ret0, _ := ret[0].(error) return ret0 } // Lock indicates an expected call of Lock. -func (mr *MockContextMockRecorder) Lock(ctx, caller interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) Lock(ctx, lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockContext)(nil).Lock), ctx, caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockContext)(nil).Lock), ctx, lockPriority) } // PersistWorkflowEvents mocks base method. @@ -230,15 +230,15 @@ func (mr *MockContextMockRecorder) SetWorkflowExecution(ctx interface{}) *gomock } // Unlock mocks base method. -func (m *MockContext) Unlock(caller CallerType) { +func (m *MockContext) Unlock(lockPriority LockPriority) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Unlock", caller) + m.ctrl.Call(m, "Unlock", lockPriority) } // Unlock indicates an expected call of Unlock. -func (mr *MockContextMockRecorder) Unlock(caller interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) Unlock(lockPriority interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockContext)(nil).Unlock), caller) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockContext)(nil).Unlock), lockPriority) } // UpdateRegistry mocks base method. diff --git a/service/history/workflowRebuilder.go b/service/history/workflowRebuilder.go index a95c293dc91..5ac487a1374 100644 --- a/service/history/workflowRebuilder.go +++ b/service/history/workflowRebuilder.go @@ -85,6 +85,7 @@ func (r *workflowRebuilderImpl) rebuild( nil, api.BypassMutableStateConsistencyPredicate, workflowKey, + workflow.LockPriorityHigh, ) if err != nil { return err diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 517539e1165..e09b418cf7e 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -378,6 +378,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( token.WorkflowId, token.RunId, ), + workflow.LockPriorityHigh, ) if err != nil { return nil, err @@ -748,6 +749,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) verifyFirstWorkflowTaskSchedule req.WorkflowExecution.WorkflowId, req.WorkflowExecution.RunId, ), + workflow.LockPriorityLow, ) if err != nil { return err