diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 8c0010f2a38..a0d3b762017 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -214,6 +214,11 @@ func IsWorkflowOpen(isOpen bool) Tag { return newBoolTag("is-workflow-open", isOpen) } +// WorkflowTerminationReason returns a tag to report a workflow's termination reason +func WorkflowTerminationReason(reason string) Tag { + return newStringTag("wf-termination-reason", reason) +} + // domain related // WorkflowDomainID returns tag for WorkflowDomainID diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 27053266e46..4af92cac865 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2247,6 +2247,7 @@ const ( TaskQueueLatencyPerDomain TransferTaskMissingEventCounterPerDomain ReplicationTasksAppliedPerDomain + WorkflowTerminateCounterPerDomain TaskRedispatchQueuePendingTasksTimer @@ -2898,6 +2899,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer}, TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter}, ReplicationTasksAppliedPerDomain: {metricName: "replication_tasks_applied_per_domain", metricRollupName: "replication_tasks_applied", metricType: Counter}, + WorkflowTerminateCounterPerDomain: {metricName: "workflow_terminate_counter_per_domain", metricRollupName: "workflow_terminate_counter", metricType: Counter}, TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter}, TaskBatchCompleteFailure: {metricName: "task_batch_complete_error", metricType: Counter}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 65897d5ad74..86d4e23b1e0 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -62,6 +62,7 @@ const ( globalRatelimitKey = "global_ratelimit_key" globalRatelimitType = "global_ratelimit_type" globalRatelimitCollectionName = "global_ratelimit_collection" + workflowTerminationReason = "workflow_termination_reason" allValue = "all" unknownValue = "_unknown_" @@ -262,6 +263,11 @@ func GlobalRatelimiterCollectionName(value string) Tag { return simpleMetric{key: globalRatelimitCollectionName, value: value} } +// WorkflowTerminationReasonTag reports the reason for workflow termination +func WorkflowTerminationReasonTag(value string) Tag { + return simpleMetric{key: workflowTerminationReason, value: value} +} + // PartitionConfigTags returns a list of partition config tags func PartitionConfigTags(partitionConfig map[string]string) []Tag { tags := make([]Tag, 0, len(partitionConfig)) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index e83ff1c31d6..5bac87ef07c 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -1181,6 +1181,22 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent( if err := e.ReplicateWorkflowExecutionTerminatedEvent(firstEventID, event); err != nil { return nil, err } + + domainName := e.GetDomainEntry().GetInfo().Name + + e.logger.Info( + "Workflow execution terminated.", + tag.WorkflowDomainName(domainName), + tag.WorkflowID(e.GetExecutionInfo().WorkflowID), + tag.WorkflowRunID(e.GetExecutionInfo().RunID), + tag.WorkflowTerminationReason(reason), + ) + + scopeWithDomainTag := e.metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope). + Tagged(metrics.DomainTag(domainName)). + Tagged(metrics.WorkflowTerminationReasonTag(reason)) + scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain) + return event, nil } diff --git a/service/history/execution/workflow.go b/service/history/execution/workflow.go index 236a15c4764..d87f9c41d60 100644 --- a/service/history/execution/workflow.go +++ b/service/history/execution/workflow.go @@ -36,7 +36,7 @@ const ( IdentityHistoryService = "history-service" // WorkflowTerminationIdentity is the component which decides to terminate the workflow WorkflowTerminationIdentity = "worker-service" - // WorkflowTerminationReason is the reason for terminating workflow due to version conflit + // WorkflowTerminationReason is the reason for terminating workflow due to version conflict WorkflowTerminationReason = "Terminate Workflow Due To Version Conflict." ) @@ -189,7 +189,7 @@ func (r *workflowImpl) SuppressBy( currentCluster := r.clusterMetadata.GetCurrentClusterName() if currentCluster == lastWriteCluster { - return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion) + return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, WorkflowTerminationReason) } return TransactionPolicyPassive, r.zombiefyWorkflow() } @@ -251,6 +251,7 @@ func (r *workflowImpl) failDecision( func (r *workflowImpl) terminateWorkflow( lastWriteVersion int64, incomingLastWriteVersion int64, + terminationReason string, ) error { eventBatchFirstEventID := r.GetMutableState().GetNextEventID() @@ -265,7 +266,7 @@ func (r *workflowImpl) terminateWorkflow( _, err := r.mutableState.AddWorkflowExecutionTerminatedEvent( eventBatchFirstEventID, - WorkflowTerminationReason, + terminationReason, []byte(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)), WorkflowTerminationIdentity, )