diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index f5f1e7006ec..020a7a9ea3d 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -45,6 +45,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/cache" ) type eagerStartDeniedReason metrics.ReasonString @@ -166,6 +167,14 @@ func (s *Starter) Invoke( if err != nil { return nil, err } + + // grab current workflow context as a lock so that user latency can be computed + currentRelease, err := s.lockCurrentWorkflowExecution(ctx) + if err != nil { + return nil, err + } + defer func() { currentRelease(retError) }() + err = s.createBrandNew(ctx, creationParams) if err == nil { return s.generateResponse(creationParams.runID, creationParams.workflowTaskInfo, extractHistoryEvents(creationParams.workflowEventBatches)) @@ -178,6 +187,21 @@ func (s *Starter) Invoke( return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError) } +func (s *Starter) lockCurrentWorkflowExecution( + ctx context.Context, +) (cache.ReleaseCacheFunc, error) { + _, currentRelease, err := s.workflowConsistencyChecker.GetWorkflowCache().GetOrCreateCurrentWorkflowExecution( + ctx, + s.namespace.ID(), + s.request.StartRequest.WorkflowId, + workflow.CallerTypeAPI, + ) + if err != nil { + return nil, err + } + return currentRelease, nil +} + // createNewMutableState creates a new workflow context, and closes its mutable state transaction as snapshot. // It returns the creationContext which can later be used to insert into the executions table. func (s *Starter) createNewMutableState(ctx context.Context, workflowID string, runID string) (*creationParams, error) { diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index bb8e07be39d..ad07adf908d 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -55,6 +55,13 @@ type ( ReleaseCacheFunc func(err error) Cache interface { + GetOrCreateCurrentWorkflowExecution( + ctx context.Context, + namespaceID namespace.ID, + workflowID string, + caller workflow.CallerType, + ) (workflow.Context, ReleaseCacheFunc, error) + GetOrCreateWorkflowExecution( ctx context.Context, namespaceID namespace.ID, @@ -97,6 +104,42 @@ func NewCache(shard shard.Context) Cache { } } +func (c *CacheImpl) GetOrCreateCurrentWorkflowExecution( + ctx context.Context, + namespaceID namespace.ID, + workflowID string, + caller workflow.CallerType, +) (workflow.Context, ReleaseCacheFunc, error) { + + if err := c.validateWorkflowID(workflowID); err != nil { + return nil, nil, err + } + + handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryCacheGetOrCreateCurrentScope)) + handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1) + start := time.Now() + defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(start)) }() + + execution := commonpb.WorkflowExecution{ + WorkflowId: workflowID, + // using empty run ID as current workflow run ID + RunId: "", + } + + weCtx, weReleaseFn, err := c.getOrCreateWorkflowExecutionInternal( + ctx, + namespaceID, + execution, + handler, + true, + caller, + ) + + metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), time.Since(start).Nanoseconds()) + + return weCtx, weReleaseFn, err +} + func (c *CacheImpl) GetOrCreateWorkflowExecution( ctx context.Context, namespaceID namespace.ID, @@ -197,13 +240,8 @@ func (c *CacheImpl) validateWorkflowExecutionInfo( execution *commonpb.WorkflowExecution, ) error { - if execution.GetWorkflowId() == "" { - return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.") - } - - if !utf8.ValidString(execution.GetWorkflowId()) { - // We know workflow cannot exist with invalid utf8 string as WorkflowID. - return serviceerror.NewNotFound("Workflow not exists.") + if err := c.validateWorkflowID(execution.GetWorkflowId()); err != nil { + return err } // RunID is not provided, lets try to retrieve the RunID for current active execution @@ -224,3 +262,18 @@ func (c *CacheImpl) validateWorkflowExecutionInfo( } return nil } + +func (c *CacheImpl) validateWorkflowID( + workflowID string, +) error { + if workflowID == "" { + return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.") + } + + if !utf8.ValidString(workflowID) { + // We know workflow cannot exist with invalid utf8 string as WorkflowID. + return serviceerror.NewNotFound("Workflow not exists.") + } + + return nil +} diff --git a/service/history/workflow/cache/cache_mock.go b/service/history/workflow/cache/cache_mock.go index 523ae155470..59233f30b6d 100644 --- a/service/history/workflow/cache/cache_mock.go +++ b/service/history/workflow/cache/cache_mock.go @@ -61,6 +61,22 @@ func (m *MockCache) EXPECT() *MockCacheMockRecorder { return m.recorder } +// GetOrCreateCurrentWorkflowExecution mocks base method. +func (m *MockCache) GetOrCreateCurrentWorkflowExecution(ctx context.Context, namespaceID namespace.ID, workflowID string, caller workflow.CallerType) (workflow.Context, ReleaseCacheFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrCreateCurrentWorkflowExecution", ctx, namespaceID, workflowID, caller) + ret0, _ := ret[0].(workflow.Context) + ret1, _ := ret[1].(ReleaseCacheFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetOrCreateCurrentWorkflowExecution indicates an expected call of GetOrCreateCurrentWorkflowExecution. +func (mr *MockCacheMockRecorder) GetOrCreateCurrentWorkflowExecution(ctx, namespaceID, workflowID, caller interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateCurrentWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateCurrentWorkflowExecution), ctx, namespaceID, workflowID, caller) +} + // GetOrCreateWorkflowExecution mocks base method. func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, namespaceID namespace.ID, execution v1.WorkflowExecution, caller workflow.CallerType) (workflow.Context, ReleaseCacheFunc, error) { m.ctrl.T.Helper() diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index 78cbc44b5c4..18a5c165d1d 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -41,6 +41,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" @@ -365,3 +366,38 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Pin() { } stopGroup.Wait() } + +func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { + s.cache = NewCache(s.mockShard) + + ctx := metrics.AddMetricsContext(context.Background()) + _, currentRelease, err := s.cache.GetOrCreateCurrentWorkflowExecution( + ctx, + tests.NamespaceID, + tests.WorkflowID, + workflow.CallerTypeAPI, + ) + s.NoError(err) + defer currentRelease(nil) + + latency1, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()) + s.True(ok) + s.NotZero(latency1) + + _, release, err := s.cache.GetOrCreateWorkflowExecution( + ctx, + tests.NamespaceID, + commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + }, + workflow.CallerTypeAPI, + ) + s.Nil(err) + defer release(nil) + + latency2, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()) + s.True(ok) + s.Greater(latency2, latency1) + +}