From 9a4f19e15d0e8df1179707e7dafb3ed92e28a1da Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Mon, 30 Jan 2023 11:25:20 -0800 Subject: [PATCH] Mark state transition & completion metrics with namespace activeness (#3831) --- common/metrics/tags.go | 30 ++-- service/history/historyEngine2_test.go | 2 + service/history/ndc/workflow_resetter.go | 3 + service/history/ndc/workflow_resetter_test.go | 25 +++- .../timerQueueActiveTaskExecutor_test.go | 5 +- .../timerQueueStandbyTaskExecutor_test.go | 5 +- .../transferQueueActiveTaskExecutor_test.go | 5 +- service/history/workflow/context.go | 102 ++++++++++--- service/history/workflow/metrics.go | 2 + service/history/workflow/transaction.go | 6 + service/history/workflow/transaction_impl.go | 141 +++++++++++++----- service/history/workflow/transaction_mock.go | 24 +-- service/history/workflow/transaction_test.go | 7 + 13 files changed, 264 insertions(+), 93 deletions(-) diff --git a/common/metrics/tags.go b/common/metrics/tags.go index b2500870433..5f8bc591ccf 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -41,15 +41,16 @@ const ( buildPlatformTag = "build_platform" goVersionTag = "go_version" - instance = "instance" - namespace = "namespace" - targetCluster = "target_cluster" - taskQueue = "taskqueue" - workflowType = "workflowType" - activityType = "activityType" - commandType = "commandType" - serviceName = "service_name" - actionType = "action_type" + instance = "instance" + namespace = "namespace" + namespaceState = "namespace_state" + targetCluster = "target_cluster" + taskQueue = "taskqueue" + workflowType = "workflowType" + activityType = "activityType" + commandType = "commandType" + serviceName = "service_name" + actionType = "action_type" namespaceAllValue = "all" unknownValue = "_unknown_" @@ -103,6 +104,17 @@ func NamespaceUnknownTag() Tag { return namespaceUnknownTag } +// NamespaceStateTag returns a new namespace state tag. +func NamespaceStateTag(value string) Tag { + if len(value) == 0 { + value = unknownValue + } + return &tagImpl{ + key: namespaceState, + value: value, + } +} + var taskQueueUnknownTag = &tagImpl{key: taskQueue, value: unknownValue} // TaskQueueUnknownTag returns a new taskqueue:unknown tag-value diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index eed7ba049ee..b84db2a3822 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -153,6 +153,8 @@ func (s *engine2Suite) SetupTest() { s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ParentNamespaceID).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes() s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes() + s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(false).AnyTimes() s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes() diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index e66c6396984..c4ea5f1df66 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -35,6 +35,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" @@ -364,8 +365,10 @@ func (r *workflowResetterImpl) persistToDB( if currentWorkflowSizeDiff, resetWorkflowSizeDiff, err := r.transaction.UpdateWorkflowExecution( ctx, persistence.UpdateWorkflowModeUpdateCurrent, + currentWorkflow.GetMutableState().GetCurrentVersion(), currentWorkflowMutation, currentWorkflowEventsSeq, + workflow.MutableStateFailoverVersion(resetWorkflow.GetMutableState()), resetWorkflowSnapshot, resetWorkflowEventsSeq, ); err != nil { diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 67cae6d058e..2db4cfd5cda 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -37,6 +37,8 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + + historyspb "go.temporal.io/server/api/history/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/collection" @@ -47,6 +49,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" @@ -139,10 +142,18 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() { currentWorkflow.EXPECT().GetMutableState().Return(currentMutableState).AnyTimes() currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes() + currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes() currentEventsSize := int64(2333) currentNewEventsSize := int64(3444) currentMutation := &persistence.WorkflowMutation{ - ExecutionInfo: &persistencespb.WorkflowExecutionInfo{}, + ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ + VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{ + BranchToken: []byte{1, 2, 3}, + Items: []*historyspb.VersionHistoryItem{ + {EventId: 234, Version: 0}, + }, + }), + }, } currentEventsSeq := []*persistence.WorkflowEvents{{ NamespaceID: s.namespaceID.String(), @@ -165,10 +176,18 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() { resetWorkflow.EXPECT().GetMutableState().Return(resetMutableState).AnyTimes() resetWorkflow.EXPECT().GetReleaseFn().Return(tarGetReleaseFn).AnyTimes() + resetMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes() resetEventsSize := int64(1444) resetNewEventsSize := int64(4321) resetSnapshot := &persistence.WorkflowSnapshot{ - ExecutionInfo: &persistencespb.WorkflowExecutionInfo{}, + ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ + VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{ + BranchToken: []byte{1, 2, 3}, + Items: []*historyspb.VersionHistoryItem{ + {EventId: 123, Version: 0}, + }, + }), + }, } resetEventsSeq := []*persistence.WorkflowEvents{{ NamespaceID: s.namespaceID.String(), @@ -189,8 +208,10 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() { s.mockTransaction.EXPECT().UpdateWorkflowExecution( gomock.Any(), persistence.UpdateWorkflowModeUpdateCurrent, + int64(0), currentMutation, currentEventsSeq, + convert.Int64Ptr(0), resetSnapshot, resetEventsSeq, ).Return(currentNewEventsSize, resetNewEventsSize, nil) diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index 65b7f969b64..3dd3f8f24ed 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -54,7 +54,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" @@ -154,7 +154,8 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() { // ack manager will use the namespace information s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes() - s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes() + s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index 2448d1366d8..13518860409 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -58,7 +58,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" @@ -159,7 +159,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.mockMatchingClient = s.mockShard.Resource.MatchingClient s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(s.namespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(s.namespaceEntry.Name(), nil).AnyTimes() - s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes() + s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 40cc09a23b6..108fefdf321 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -71,7 +71,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" - deletemanager "go.temporal.io/server/service/history/deletemanager" + "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" @@ -199,7 +199,8 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ChildNamespaceID).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.MissedNamespaceID).Return(nil, serviceerror.NewNamespaceNotFound(tests.MissedNamespaceID.String())).AnyTimes() - s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes() + s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 526f06ad670..da90e8de0fe 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -40,6 +40,8 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/convert" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/locks" "go.temporal.io/server/common/log" @@ -147,13 +149,14 @@ type ( type ( ContextImpl struct { - shard shard.Context - workflowKey definition.WorkflowKey - logger log.Logger - metricsHandler metrics.Handler - timeSource clock.TimeSource - config *configs.Config - transaction Transaction + shard shard.Context + workflowKey definition.WorkflowKey + logger log.Logger + metricsHandler metrics.Handler + clusterMetadata cluster.Metadata + timeSource clock.TimeSource + config *configs.Config + transaction Transaction mutex locks.PriorityMutex MutableState MutableState @@ -169,14 +172,15 @@ func NewContext( logger log.Logger, ) *ContextImpl { return &ContextImpl{ - shard: shard, - workflowKey: workflowKey, - logger: logger, - metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.WorkflowContextScope)), - timeSource: shard.GetTimeSource(), - config: shard.GetConfig(), - mutex: locks.NewPriorityMutex(), - transaction: NewTransaction(shard), + shard: shard, + workflowKey: workflowKey, + logger: logger, + metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.WorkflowContextScope)), + clusterMetadata: shard.GetClusterMetadata(), + timeSource: shard.GetTimeSource(), + config: shard.GetConfig(), + mutex: locks.NewPriorityMutex(), + transaction: NewTransaction(shard), stats: &persistencespb.ExecutionStats{ HistorySize: 0, }, @@ -349,6 +353,7 @@ func (c *ContextImpl) CreateWorkflowExecution( resp, err := createWorkflowExecution( ctx, c.shard, + newMutableState.GetCurrentVersion(), createRequest, ) if err != nil { @@ -361,7 +366,7 @@ func (c *ContextImpl) CreateWorkflowExecution( return err } NotifyWorkflowSnapshotTasks(engine, newWorkflow) - emitStateTransitionCount(c.metricsHandler, newMutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState) return nil } @@ -451,10 +456,13 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution( if resetWorkflowSizeDiff, newWorkflowSizeDiff, currentWorkflowSizeDiff, err := c.transaction.ConflictResolveWorkflowExecution( ctx, conflictResolveMode, + resetMutableState.GetCurrentVersion(), resetWorkflow, resetWorkflowEventsSeq, + MutableStateFailoverVersion(newMutableState), newWorkflow, newWorkflowEventsSeq, + MutableStateFailoverVersion(currentMutableState), currentWorkflow, currentWorkflowEventsSeq, ); err != nil { @@ -469,9 +477,9 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution( } } - emitStateTransitionCount(c.metricsHandler, resetMutableState) - emitStateTransitionCount(c.metricsHandler, newMutableState) - emitStateTransitionCount(c.metricsHandler, currentMutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, resetMutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, currentMutableState) return nil } @@ -628,8 +636,10 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew( if currentWorkflowSizeDiff, newWorkflowSizeDiff, err := c.transaction.UpdateWorkflowExecution( ctx, updateMode, + c.MutableState.GetCurrentVersion(), currentWorkflow, currentWorkflowEventsSeq, + MutableStateFailoverVersion(newMutableState), newWorkflow, newWorkflowEventsSeq, ); err != nil { @@ -641,8 +651,8 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew( } } - emitStateTransitionCount(c.metricsHandler, c.MutableState) - emitStateTransitionCount(c.metricsHandler, newMutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, c.MutableState) + emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState) // finally emit session stats namespace := c.GetNamespace() @@ -922,12 +932,58 @@ func (c *ContextImpl) enforceSizeCheck( func emitStateTransitionCount( metricsHandler metrics.Handler, + clusterMetadata cluster.Metadata, mutableState MutableState, ) { if mutableState == nil { return } - metricsHandler.Histogram(metrics.StateTransitionCount.GetMetricName(), metrics.StateTransitionCount.GetMetricUnit()). - Record(mutableState.GetExecutionInfo().StateTransitionCount, metrics.NamespaceTag(mutableState.GetNamespaceEntry().Name().String())) + namespaceEntry := mutableState.GetNamespaceEntry() + metricsHandler.Histogram( + metrics.StateTransitionCount.GetMetricName(), + metrics.StateTransitionCount.GetMetricUnit(), + ).Record( + mutableState.GetExecutionInfo().StateTransitionCount, + metrics.NamespaceTag(namespaceEntry.Name().String()), + metrics.NamespaceStateTag(namespaceState(clusterMetadata, convert.Int64Ptr(mutableState.GetCurrentVersion()))), + ) +} + +const ( + namespaceStateActive = "active" + namespaceStatePassive = "passive" + namespaceStateUnknown = "_unknown_" +) + +func namespaceState( + clusterMetadata cluster.Metadata, + mutableStateCurrentVersion *int64, +) string { + + if mutableStateCurrentVersion == nil { + return namespaceStateUnknown + } + + // default value, need to special handle + if *mutableStateCurrentVersion == 0 { + return namespaceStateActive + } + + if clusterMetadata.IsVersionFromSameCluster( + clusterMetadata.GetClusterID(), + *mutableStateCurrentVersion, + ) { + return namespaceStateActive + } + return namespaceStatePassive +} + +func MutableStateFailoverVersion( + mutableState MutableState, +) *int64 { + if mutableState == nil { + return nil + } + return convert.Int64Ptr(mutableState.GetCurrentVersion()) } diff --git a/service/history/workflow/metrics.go b/service/history/workflow/metrics.go index 88f0edb6aaa..a4e5ffc99bf 100644 --- a/service/history/workflow/metrics.go +++ b/service/history/workflow/metrics.go @@ -83,12 +83,14 @@ func emitMutableStateStatus( func emitWorkflowCompletionStats( metricsHandler metrics.Handler, namespace namespace.Name, + namespaceState string, taskQueue string, status enumspb.WorkflowExecutionStatus, ) { handler := metricsHandler.WithTags( metrics.OperationTag(metrics.WorkflowCompletionStatsScope), metrics.NamespaceTag(namespace.String()), + metrics.NamespaceStateTag(namespaceState), metrics.TaskQueueTag(taskQueue), ) diff --git a/service/history/workflow/transaction.go b/service/history/workflow/transaction.go index d5b832a26a4..29a103ce6aa 100644 --- a/service/history/workflow/transaction.go +++ b/service/history/workflow/transaction.go @@ -36,6 +36,7 @@ type ( CreateWorkflowExecution( ctx context.Context, createMode persistence.CreateWorkflowMode, + newWorkflowFailoverVersion int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, error) @@ -43,10 +44,13 @@ type ( ConflictResolveWorkflowExecution( ctx context.Context, conflictResolveMode persistence.ConflictResolveWorkflowMode, + resetWorkflowFailoverVersion int64, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, + newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + currentWorkflowFailoverVersion *int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, int64, int64, error) @@ -54,8 +58,10 @@ type ( UpdateWorkflowExecution( ctx context.Context, updateMode persistence.UpdateWorkflowMode, + currentWorkflowFailoverVersion int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, + newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, int64, error) diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index 9e0ca79a058..8a359bd891e 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -44,9 +44,10 @@ import ( type ( completionMetric struct { - initialized bool - taskQueue string - status enumspb.WorkflowExecutionStatus + initialized bool + taskQueue string + namespaceState string + status enumspb.WorkflowExecutionStatus } TransactionImpl struct { shard shard.Context @@ -68,6 +69,7 @@ func NewTransaction( func (t *TransactionImpl) CreateWorkflowExecution( ctx context.Context, createMode persistence.CreateWorkflowMode, + newWorkflowFailoverVersion int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, error) { @@ -77,13 +79,18 @@ func (t *TransactionImpl) CreateWorkflowExecution( return 0, err } - resp, err := createWorkflowExecution(ctx, t.shard, &persistence.CreateWorkflowExecutionRequest{ - ShardID: t.shard.GetShardID(), - // RangeID , this is set by shard context - Mode: createMode, - NewWorkflowSnapshot: *newWorkflowSnapshot, - NewWorkflowEvents: newWorkflowEventsSeq, - }) + resp, err := createWorkflowExecution( + ctx, + t.shard, + newWorkflowFailoverVersion, + &persistence.CreateWorkflowExecutionRequest{ + ShardID: t.shard.GetShardID(), + // RangeID , this is set by shard context + Mode: createMode, + NewWorkflowSnapshot: *newWorkflowSnapshot, + NewWorkflowEvents: newWorkflowEventsSeq, + }, + ) if shard.OperationPossiblySucceeded(err) { NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot) } @@ -101,10 +108,13 @@ func (t *TransactionImpl) CreateWorkflowExecution( func (t *TransactionImpl) ConflictResolveWorkflowExecution( ctx context.Context, conflictResolveMode persistence.ConflictResolveWorkflowMode, + resetWorkflowFailoverVersion int64, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, + newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, + currentWorkflowFailoverVersion *int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, int64, int64, error) { @@ -114,17 +124,24 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution( return 0, 0, 0, err } - resp, err := conflictResolveWorkflowExecution(ctx, t.shard, &persistence.ConflictResolveWorkflowExecutionRequest{ - ShardID: t.shard.GetShardID(), - // RangeID , this is set by shard context - Mode: conflictResolveMode, - ResetWorkflowSnapshot: *resetWorkflowSnapshot, - ResetWorkflowEvents: resetWorkflowEventsSeq, - NewWorkflowSnapshot: newWorkflowSnapshot, - NewWorkflowEvents: newWorkflowEventsSeq, - CurrentWorkflowMutation: currentWorkflowMutation, - CurrentWorkflowEvents: currentWorkflowEventsSeq, - }) + resp, err := conflictResolveWorkflowExecution( + ctx, + t.shard, + resetWorkflowFailoverVersion, + newWorkflowFailoverVersion, + currentWorkflowFailoverVersion, + &persistence.ConflictResolveWorkflowExecutionRequest{ + ShardID: t.shard.GetShardID(), + // RangeID , this is set by shard context + Mode: conflictResolveMode, + ResetWorkflowSnapshot: *resetWorkflowSnapshot, + ResetWorkflowEvents: resetWorkflowEventsSeq, + NewWorkflowSnapshot: newWorkflowSnapshot, + NewWorkflowEvents: newWorkflowEventsSeq, + CurrentWorkflowMutation: currentWorkflowMutation, + CurrentWorkflowEvents: currentWorkflowEventsSeq, + }, + ) if shard.OperationPossiblySucceeded(err) { NotifyWorkflowSnapshotTasks(engine, resetWorkflowSnapshot) NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot) @@ -158,8 +175,10 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution( func (t *TransactionImpl) UpdateWorkflowExecution( ctx context.Context, updateMode persistence.UpdateWorkflowMode, + currentWorkflowFailoverVersion int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, + newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, ) (int64, int64, error) { @@ -168,15 +187,21 @@ func (t *TransactionImpl) UpdateWorkflowExecution( if err != nil { return 0, 0, err } - resp, err := updateWorkflowExecution(ctx, t.shard, &persistence.UpdateWorkflowExecutionRequest{ - ShardID: t.shard.GetShardID(), - // RangeID , this is set by shard context - Mode: updateMode, - UpdateWorkflowMutation: *currentWorkflowMutation, - UpdateWorkflowEvents: currentWorkflowEventsSeq, - NewWorkflowSnapshot: newWorkflowSnapshot, - NewWorkflowEvents: newWorkflowEventsSeq, - }) + resp, err := updateWorkflowExecution( + ctx, + t.shard, + currentWorkflowFailoverVersion, + newWorkflowFailoverVersion, + &persistence.UpdateWorkflowExecutionRequest{ + ShardID: t.shard.GetShardID(), + // RangeID , this is set by shard context + Mode: updateMode, + UpdateWorkflowMutation: *currentWorkflowMutation, + UpdateWorkflowEvents: currentWorkflowEventsSeq, + NewWorkflowSnapshot: newWorkflowSnapshot, + NewWorkflowEvents: newWorkflowEventsSeq, + }, + ) if shard.OperationPossiblySucceeded(err) { NotifyWorkflowMutationTasks(engine, currentWorkflowMutation) NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot) @@ -326,6 +351,7 @@ func appendHistoryEvents( func createWorkflowExecution( ctx context.Context, shard shard.Context, + mutableStateFailoverVersion int64, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error) { @@ -359,6 +385,14 @@ func createWorkflowExecution( namespaceEntry, &resp.NewMutableStateStats, ) + emitCompletionMetrics( + shard, + namespaceEntry, + snapshotToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), &mutableStateFailoverVersion), + &request.NewWorkflowSnapshot, + ), + ) } return resp, nil } @@ -366,6 +400,9 @@ func createWorkflowExecution( func conflictResolveWorkflowExecution( ctx context.Context, shard shard.Context, + resetWorkflowFailoverVersion int64, + newWorkflowFailoverVersion *int64, + currentWorkflowFailoverVersion *int64, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error) { @@ -395,9 +432,18 @@ func conflictResolveWorkflowExecution( emitCompletionMetrics( shard, namespaceEntry, - snapshotToCompletionMetric(&request.ResetWorkflowSnapshot), - snapshotToCompletionMetric(request.NewWorkflowSnapshot), - mutationToCompletionMetric(request.CurrentWorkflowMutation), + snapshotToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), &resetWorkflowFailoverVersion), + &request.ResetWorkflowSnapshot, + ), + snapshotToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), newWorkflowFailoverVersion), + request.NewWorkflowSnapshot, + ), + mutationToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), currentWorkflowFailoverVersion), + request.CurrentWorkflowMutation, + ), ) } return resp, nil @@ -443,6 +489,8 @@ func getWorkflowExecution( func updateWorkflowExecution( ctx context.Context, shard shard.Context, + updateWorkflowFailoverVersion int64, + newWorkflowFailoverVersion *int64, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error) { @@ -471,8 +519,14 @@ func updateWorkflowExecution( emitCompletionMetrics( shard, namespaceEntry, - mutationToCompletionMetric(&request.UpdateWorkflowMutation), - snapshotToCompletionMetric(request.NewWorkflowSnapshot), + mutationToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), &updateWorkflowFailoverVersion), + &request.UpdateWorkflowMutation, + ), + snapshotToCompletionMetric( + namespaceState(shard.GetClusterMetadata(), newWorkflowFailoverVersion), + request.NewWorkflowSnapshot, + ), ) } @@ -639,28 +693,32 @@ func emitGetMetrics( } func snapshotToCompletionMetric( + namespaceState string, workflowSnapshot *persistence.WorkflowSnapshot, ) completionMetric { if workflowSnapshot == nil { return completionMetric{initialized: false} } return completionMetric{ - initialized: true, - taskQueue: workflowSnapshot.ExecutionInfo.TaskQueue, - status: workflowSnapshot.ExecutionState.Status, + initialized: true, + taskQueue: workflowSnapshot.ExecutionInfo.TaskQueue, + namespaceState: namespaceState, + status: workflowSnapshot.ExecutionState.Status, } } func mutationToCompletionMetric( + namespaceState string, workflowMutation *persistence.WorkflowMutation, ) completionMetric { if workflowMutation == nil { return completionMetric{initialized: false} } return completionMetric{ - initialized: true, - taskQueue: workflowMutation.ExecutionInfo.TaskQueue, - status: workflowMutation.ExecutionState.Status, + initialized: true, + taskQueue: workflowMutation.ExecutionInfo.TaskQueue, + namespaceState: namespaceState, + status: workflowMutation.ExecutionState.Status, } } @@ -679,6 +737,7 @@ func emitCompletionMetrics( emitWorkflowCompletionStats( metricsHandler, namespaceName, + completionMetric.namespaceState, completionMetric.taskQueue, completionMetric.status, ) diff --git a/service/history/workflow/transaction_mock.go b/service/history/workflow/transaction_mock.go index b7981117724..9f3d6eb985e 100644 --- a/service/history/workflow/transaction_mock.go +++ b/service/history/workflow/transaction_mock.go @@ -60,9 +60,9 @@ func (m *MockTransaction) EXPECT() *MockTransactionMockRecorder { } // ConflictResolveWorkflowExecution mocks base method. -func (m *MockTransaction) ConflictResolveWorkflowExecution(ctx context.Context, conflictResolveMode persistence.ConflictResolveWorkflowMode, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, int64, error) { +func (m *MockTransaction) ConflictResolveWorkflowExecution(ctx context.Context, conflictResolveMode persistence.ConflictResolveWorkflowMode, resetWorkflowFailoverVersion int64, resetWorkflowSnapshot *persistence.WorkflowSnapshot, resetWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents, currentWorkflowFailoverVersion *int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConflictResolveWorkflowExecution", ctx, conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq) + ret := m.ctrl.Call(m, "ConflictResolveWorkflowExecution", ctx, conflictResolveMode, resetWorkflowFailoverVersion, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(int64) @@ -71,24 +71,24 @@ func (m *MockTransaction) ConflictResolveWorkflowExecution(ctx context.Context, } // ConflictResolveWorkflowExecution indicates an expected call of ConflictResolveWorkflowExecution. -func (mr *MockTransactionMockRecorder) ConflictResolveWorkflowExecution(ctx, conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) ConflictResolveWorkflowExecution(ctx, conflictResolveMode, resetWorkflowFailoverVersion, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConflictResolveWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).ConflictResolveWorkflowExecution), ctx, conflictResolveMode, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowMutation, currentWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConflictResolveWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).ConflictResolveWorkflowExecution), ctx, conflictResolveMode, resetWorkflowFailoverVersion, resetWorkflowSnapshot, resetWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq) } // CreateWorkflowExecution mocks base method. -func (m *MockTransaction) CreateWorkflowExecution(ctx context.Context, createMode persistence.CreateWorkflowMode, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, error) { +func (m *MockTransaction) CreateWorkflowExecution(ctx context.Context, createMode persistence.CreateWorkflowMode, newWorkflowFailoverVersion int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, createMode, newWorkflowSnapshot, newWorkflowEventsSeq) + ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, createMode, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. -func (mr *MockTransactionMockRecorder) CreateWorkflowExecution(ctx, createMode, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) CreateWorkflowExecution(ctx, createMode, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).CreateWorkflowExecution), ctx, createMode, newWorkflowSnapshot, newWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).CreateWorkflowExecution), ctx, createMode, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq) } // SetWorkflowExecution mocks base method. @@ -106,9 +106,9 @@ func (mr *MockTransactionMockRecorder) SetWorkflowExecution(ctx, workflowSnapsho } // UpdateWorkflowExecution mocks base method. -func (m *MockTransaction) UpdateWorkflowExecution(ctx context.Context, updateMode persistence.UpdateWorkflowMode, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, error) { +func (m *MockTransaction) UpdateWorkflowExecution(ctx context.Context, updateMode persistence.UpdateWorkflowMode, currentWorkflowFailoverVersion int64, currentWorkflowMutation *persistence.WorkflowMutation, currentWorkflowEventsSeq []*persistence.WorkflowEvents, newWorkflowFailoverVersion *int64, newWorkflowSnapshot *persistence.WorkflowSnapshot, newWorkflowEventsSeq []*persistence.WorkflowEvents) (int64, int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateWorkflowExecution", ctx, updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq) + ret := m.ctrl.Call(m, "UpdateWorkflowExecution", ctx, updateMode, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(error) @@ -116,7 +116,7 @@ func (m *MockTransaction) UpdateWorkflowExecution(ctx context.Context, updateMod } // UpdateWorkflowExecution indicates an expected call of UpdateWorkflowExecution. -func (mr *MockTransactionMockRecorder) UpdateWorkflowExecution(ctx, updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { +func (mr *MockTransactionMockRecorder) UpdateWorkflowExecution(ctx, updateMode, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).UpdateWorkflowExecution), ctx, updateMode, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowSnapshot, newWorkflowEventsSeq) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecution", reflect.TypeOf((*MockTransaction)(nil).UpdateWorkflowExecution), ctx, updateMode, currentWorkflowFailoverVersion, currentWorkflowMutation, currentWorkflowEventsSeq, newWorkflowFailoverVersion, newWorkflowSnapshot, newWorkflowEventsSeq) } diff --git a/service/history/workflow/transaction_test.go b/service/history/workflow/transaction_test.go index d2280a148f1..a5305fcad91 100644 --- a/service/history/workflow/transaction_test.go +++ b/service/history/workflow/transaction_test.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/convert" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" @@ -120,6 +121,7 @@ func (s *transactionSuite) TestCreateWorkflowExecution_NotifyTaskWhenFailed() { _, err := s.transaction.CreateWorkflowExecution( context.Background(), persistence.CreateWorkflowModeBrandNew, + 0, &persistence.WorkflowSnapshot{ ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ NamespaceId: tests.NamespaceID.String(), @@ -145,6 +147,7 @@ func (s *transactionSuite) TestUpdateWorkflowExecution_NotifyTaskWhenFailed() { _, _, err := s.transaction.UpdateWorkflowExecution( context.Background(), persistence.UpdateWorkflowModeUpdateCurrent, + 0, &persistence.WorkflowMutation{ ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ NamespaceId: tests.NamespaceID.String(), @@ -155,6 +158,7 @@ func (s *transactionSuite) TestUpdateWorkflowExecution_NotifyTaskWhenFailed() { }, }, []*persistence.WorkflowEvents{}, + convert.Int64Ptr(0), &persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{}, ) @@ -173,6 +177,7 @@ func (s *transactionSuite) TestConflictResolveWorkflowExecution_NotifyTaskWhenFa _, _, _, err := s.transaction.ConflictResolveWorkflowExecution( context.Background(), persistence.ConflictResolveWorkflowModeUpdateCurrent, + 0, &persistence.WorkflowSnapshot{ ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ NamespaceId: tests.NamespaceID.String(), @@ -183,8 +188,10 @@ func (s *transactionSuite) TestConflictResolveWorkflowExecution_NotifyTaskWhenFa }, }, []*persistence.WorkflowEvents{}, + convert.Int64Ptr(0), &persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{}, + convert.Int64Ptr(0), &persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{}, )