Skip to content

Commit

Permalink
Mark state transition & completion metrics with namespace activeness (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jan 30, 2023
1 parent 6e732e7 commit 9a4f19e
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 93 deletions.
30 changes: 21 additions & 9 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ const (
buildPlatformTag = "build_platform"
goVersionTag = "go_version"

instance = "instance"
namespace = "namespace"
targetCluster = "target_cluster"
taskQueue = "taskqueue"
workflowType = "workflowType"
activityType = "activityType"
commandType = "commandType"
serviceName = "service_name"
actionType = "action_type"
instance = "instance"
namespace = "namespace"
namespaceState = "namespace_state"
targetCluster = "target_cluster"
taskQueue = "taskqueue"
workflowType = "workflowType"
activityType = "activityType"
commandType = "commandType"
serviceName = "service_name"
actionType = "action_type"

namespaceAllValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -103,6 +104,17 @@ func NamespaceUnknownTag() Tag {
return namespaceUnknownTag
}

// NamespaceStateTag returns a new namespace state tag.
func NamespaceStateTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return &tagImpl{
key: namespaceState,
value: value,
}
}

var taskQueueUnknownTag = &tagImpl{key: taskQueue, value: unknownValue}

// TaskQueueUnknownTag returns a new taskqueue:unknown tag-value
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (s *engine2Suite) SetupTest() {
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ParentNamespaceID).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes()
s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(false).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
Expand Down
3 changes: 3 additions & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -364,8 +365,10 @@ func (r *workflowResetterImpl) persistToDB(
if currentWorkflowSizeDiff, resetWorkflowSizeDiff, err := r.transaction.UpdateWorkflowExecution(
ctx,
persistence.UpdateWorkflowModeUpdateCurrent,
currentWorkflow.GetMutableState().GetCurrentVersion(),
currentWorkflowMutation,
currentWorkflowEventsSeq,
workflow.MutableStateFailoverVersion(resetWorkflow.GetMutableState()),
resetWorkflowSnapshot,
resetWorkflowEventsSeq,
); err != nil {
Expand Down
25 changes: 23 additions & 2 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
Expand All @@ -47,6 +49,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -139,10 +142,18 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
currentWorkflow.EXPECT().GetMutableState().Return(currentMutableState).AnyTimes()
currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes()

currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
currentEventsSize := int64(2333)
currentNewEventsSize := int64(3444)
currentMutation := &persistence.WorkflowMutation{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{},
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{
BranchToken: []byte{1, 2, 3},
Items: []*historyspb.VersionHistoryItem{
{EventId: 234, Version: 0},
},
}),
},
}
currentEventsSeq := []*persistence.WorkflowEvents{{
NamespaceID: s.namespaceID.String(),
Expand All @@ -165,10 +176,18 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
resetWorkflow.EXPECT().GetMutableState().Return(resetMutableState).AnyTimes()
resetWorkflow.EXPECT().GetReleaseFn().Return(tarGetReleaseFn).AnyTimes()

resetMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
resetEventsSize := int64(1444)
resetNewEventsSize := int64(4321)
resetSnapshot := &persistence.WorkflowSnapshot{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{},
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
VersionHistories: versionhistory.NewVersionHistories(&historyspb.VersionHistory{
BranchToken: []byte{1, 2, 3},
Items: []*historyspb.VersionHistoryItem{
{EventId: 123, Version: 0},
},
}),
},
}
resetEventsSeq := []*persistence.WorkflowEvents{{
NamespaceID: s.namespaceID.String(),
Expand All @@ -189,8 +208,10 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
s.mockTransaction.EXPECT().UpdateWorkflowExecution(
gomock.Any(),
persistence.UpdateWorkflowModeUpdateCurrent,
int64(0),
currentMutation,
currentEventsSeq,
convert.Int64Ptr(0),
resetSnapshot,
resetEventsSeq,
).Return(currentNewEventsSize, resetNewEventsSize, nil)
Expand Down
5 changes: 3 additions & 2 deletions service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
deletemanager "go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -154,7 +154,8 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() {
// ack manager will use the namespace information
s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes()
s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
Expand Down
5 changes: 3 additions & 2 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/consts"
deletemanager "go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -159,7 +159,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() {
s.mockMatchingClient = s.mockShard.Resource.MatchingClient
s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(s.namespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(s.namespaceEntry.Name(), nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes()
s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
Expand Down
5 changes: 3 additions & 2 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
deletemanager "go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -199,7 +199,8 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() {
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ChildNamespaceID).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.MissedNamespaceID).Return(nil, serviceerror.NewNamespaceNotFound(tests.MissedNamespaceID.String())).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
s.mockClusterMetadata.EXPECT().GetClusterID().Return(tests.Version).AnyTimes()
s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(tests.Version, tests.Version).Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
Expand Down
102 changes: 79 additions & 23 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -147,13 +149,14 @@ type (

type (
ContextImpl struct {
shard shard.Context
workflowKey definition.WorkflowKey
logger log.Logger
metricsHandler metrics.Handler
timeSource clock.TimeSource
config *configs.Config
transaction Transaction
shard shard.Context
workflowKey definition.WorkflowKey
logger log.Logger
metricsHandler metrics.Handler
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
config *configs.Config
transaction Transaction

mutex locks.PriorityMutex
MutableState MutableState
Expand All @@ -169,14 +172,15 @@ func NewContext(
logger log.Logger,
) *ContextImpl {
return &ContextImpl{
shard: shard,
workflowKey: workflowKey,
logger: logger,
metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.WorkflowContextScope)),
timeSource: shard.GetTimeSource(),
config: shard.GetConfig(),
mutex: locks.NewPriorityMutex(),
transaction: NewTransaction(shard),
shard: shard,
workflowKey: workflowKey,
logger: logger,
metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.WorkflowContextScope)),
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
config: shard.GetConfig(),
mutex: locks.NewPriorityMutex(),
transaction: NewTransaction(shard),
stats: &persistencespb.ExecutionStats{
HistorySize: 0,
},
Expand Down Expand Up @@ -349,6 +353,7 @@ func (c *ContextImpl) CreateWorkflowExecution(
resp, err := createWorkflowExecution(
ctx,
c.shard,
newMutableState.GetCurrentVersion(),
createRequest,
)
if err != nil {
Expand All @@ -361,7 +366,7 @@ func (c *ContextImpl) CreateWorkflowExecution(
return err
}
NotifyWorkflowSnapshotTasks(engine, newWorkflow)
emitStateTransitionCount(c.metricsHandler, newMutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState)

return nil
}
Expand Down Expand Up @@ -451,10 +456,13 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
if resetWorkflowSizeDiff, newWorkflowSizeDiff, currentWorkflowSizeDiff, err := c.transaction.ConflictResolveWorkflowExecution(
ctx,
conflictResolveMode,
resetMutableState.GetCurrentVersion(),
resetWorkflow,
resetWorkflowEventsSeq,
MutableStateFailoverVersion(newMutableState),
newWorkflow,
newWorkflowEventsSeq,
MutableStateFailoverVersion(currentMutableState),
currentWorkflow,
currentWorkflowEventsSeq,
); err != nil {
Expand All @@ -469,9 +477,9 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
}
}

emitStateTransitionCount(c.metricsHandler, resetMutableState)
emitStateTransitionCount(c.metricsHandler, newMutableState)
emitStateTransitionCount(c.metricsHandler, currentMutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, resetMutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, currentMutableState)

return nil
}
Expand Down Expand Up @@ -628,8 +636,10 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
if currentWorkflowSizeDiff, newWorkflowSizeDiff, err := c.transaction.UpdateWorkflowExecution(
ctx,
updateMode,
c.MutableState.GetCurrentVersion(),
currentWorkflow,
currentWorkflowEventsSeq,
MutableStateFailoverVersion(newMutableState),
newWorkflow,
newWorkflowEventsSeq,
); err != nil {
Expand All @@ -641,8 +651,8 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
}
}

emitStateTransitionCount(c.metricsHandler, c.MutableState)
emitStateTransitionCount(c.metricsHandler, newMutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, c.MutableState)
emitStateTransitionCount(c.metricsHandler, c.clusterMetadata, newMutableState)

// finally emit session stats
namespace := c.GetNamespace()
Expand Down Expand Up @@ -922,12 +932,58 @@ func (c *ContextImpl) enforceSizeCheck(

func emitStateTransitionCount(
metricsHandler metrics.Handler,
clusterMetadata cluster.Metadata,
mutableState MutableState,
) {
if mutableState == nil {
return
}

metricsHandler.Histogram(metrics.StateTransitionCount.GetMetricName(), metrics.StateTransitionCount.GetMetricUnit()).
Record(mutableState.GetExecutionInfo().StateTransitionCount, metrics.NamespaceTag(mutableState.GetNamespaceEntry().Name().String()))
namespaceEntry := mutableState.GetNamespaceEntry()
metricsHandler.Histogram(
metrics.StateTransitionCount.GetMetricName(),
metrics.StateTransitionCount.GetMetricUnit(),
).Record(
mutableState.GetExecutionInfo().StateTransitionCount,
metrics.NamespaceTag(namespaceEntry.Name().String()),
metrics.NamespaceStateTag(namespaceState(clusterMetadata, convert.Int64Ptr(mutableState.GetCurrentVersion()))),
)
}

const (
namespaceStateActive = "active"
namespaceStatePassive = "passive"
namespaceStateUnknown = "_unknown_"
)

func namespaceState(
clusterMetadata cluster.Metadata,
mutableStateCurrentVersion *int64,
) string {

if mutableStateCurrentVersion == nil {
return namespaceStateUnknown
}

// default value, need to special handle
if *mutableStateCurrentVersion == 0 {
return namespaceStateActive
}

if clusterMetadata.IsVersionFromSameCluster(
clusterMetadata.GetClusterID(),
*mutableStateCurrentVersion,
) {
return namespaceStateActive
}
return namespaceStatePassive
}

func MutableStateFailoverVersion(
mutableState MutableState,
) *int64 {
if mutableState == nil {
return nil
}
return convert.Int64Ptr(mutableState.GetCurrentVersion())
}
2 changes: 2 additions & 0 deletions service/history/workflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ func emitMutableStateStatus(
func emitWorkflowCompletionStats(
metricsHandler metrics.Handler,
namespace namespace.Name,
namespaceState string,
taskQueue string,
status enumspb.WorkflowExecutionStatus,
) {
handler := metricsHandler.WithTags(
metrics.OperationTag(metrics.WorkflowCompletionStatsScope),
metrics.NamespaceTag(namespace.String()),
metrics.NamespaceStateTag(namespaceState),
metrics.TaskQueueTag(taskQueue),
)

Expand Down
Loading

0 comments on commit 9a4f19e

Please sign in to comment.