diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index cfbbe44687b..9a5c2f47cdc 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1474,6 +1474,7 @@ var ( ShardLockLatency = NewTimerDef("shard_lock_latency") NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency") ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter") + InorderBufferedEventsCounter = NewCounterDef("inordered_buffered_events") // Matching MatchingClientForwardedCounter = NewCounterDef("forwarded") diff --git a/service/history/workflow/history_builder.go b/service/history/workflow/history_builder.go index 75ac1924b4e..d1798a0473f 100644 --- a/service/history/workflow/history_builder.go +++ b/service/history/workflow/history_builder.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" ) @@ -90,6 +91,8 @@ type ( // scheduled to started event ID mapping scheduledIDToStartedID map[int64]int64 + + metricsHandler metrics.Handler } ) @@ -99,6 +102,7 @@ func NewMutableHistoryBuilder( version int64, nextEventID int64, dbBufferBatch []*historypb.HistoryEvent, + metricsHandler metrics.Handler, ) *HistoryBuilder { return &HistoryBuilder{ state: HistoryBuilderStateMutable, @@ -116,6 +120,8 @@ func NewMutableHistoryBuilder( memLatestBatch: nil, memBufferBatch: nil, scheduledIDToStartedID: make(map[int64]int64), + + metricsHandler: metricsHandler, } } @@ -139,6 +145,8 @@ func NewImmutableHistoryBuilder( memLatestBatch: history, memBufferBatch: nil, scheduledIDToStartedID: nil, + + metricsHandler: nil, } } @@ -1401,6 +1409,7 @@ func (b *HistoryBuilder) wireEventIDs( func (b *HistoryBuilder) reorderBuffer( bufferEvents []*historypb.HistoryEvent, ) []*historypb.HistoryEvent { + b.emitInorderedBufferedEvents(bufferEvents) reorderBuffer := make([]*historypb.HistoryEvent, 0, len(bufferEvents)) reorderEvents := make([]*historypb.HistoryEvent, 0, len(bufferEvents)) for _, event := range bufferEvents { @@ -1423,6 +1432,47 @@ func (b *HistoryBuilder) reorderBuffer( return append(reorderEvents, reorderBuffer...) } +func (b *HistoryBuilder) emitInorderedBufferedEvents(bufferedEvents []*historypb.HistoryEvent) { + completedActivities := make(map[int64]struct{}) + completedChildWorkflows := make(map[int64]struct{}) + var inorderedEventsCount int64 + for _, event := range bufferedEvents { + switch event.GetEventType() { + case enumspb.EVENT_TYPE_ACTIVITY_TASK_STARTED: + if _, seenCompleted := completedActivities[event.GetEventId()]; seenCompleted { + inorderedEventsCount++ + } + case enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED: + completedActivities[event.GetActivityTaskCompletedEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED: + completedActivities[event.GetActivityTaskFailedEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT: + completedActivities[event.GetActivityTaskTimedOutEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCELED: + completedActivities[event.GetActivityTaskCanceledEventAttributes().GetStartedEventId()] = struct{}{} + + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED: + if _, seenCompleted := completedChildWorkflows[event.GetEventId()]; seenCompleted { + inorderedEventsCount++ + } + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED: + completedChildWorkflows[event.GetChildWorkflowExecutionCompletedEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED: + completedChildWorkflows[event.GetChildWorkflowExecutionFailedEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT: + completedChildWorkflows[event.GetChildWorkflowExecutionTimedOutEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED: + completedChildWorkflows[event.GetChildWorkflowExecutionCanceledEventAttributes().GetStartedEventId()] = struct{}{} + case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED: + completedChildWorkflows[event.GetChildWorkflowExecutionTerminatedEventAttributes().GetStartedEventId()] = struct{}{} + } + } + + if inorderedEventsCount > 0 && b.metricsHandler != nil { + b.metricsHandler.Counter(metrics.InorderBufferedEventsCounter.GetMetricName()).Record(inorderedEventsCount) + } +} + func (b *HistoryBuilder) HasActivityFinishEvent( scheduledEventID int64, ) bool { diff --git a/service/history/workflow/history_builder_test.go b/service/history/workflow/history_builder_test.go index 82dd34c33ec..0145f18e159 100644 --- a/service/history/workflow/history_builder_test.go +++ b/service/history/workflow/history_builder_test.go @@ -45,6 +45,7 @@ import ( workflowspb "go.temporal.io/server/api/workflow/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/tests" @@ -154,6 +155,7 @@ func (s *historyBuilderSuite) SetupTest() { s.version, s.nextEventID, nil, + metrics.NoopMetricsHandler, ) } @@ -2088,6 +2090,7 @@ func (s *historyBuilderSuite) testWireEventIDs( s.version, s.nextEventID, nil, + metrics.NoopMetricsHandler, ) s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{startEvent} s.historyBuilder.memEventsBatches = nil @@ -2134,6 +2137,7 @@ func (s *historyBuilderSuite) TestHasBufferEvent() { s.version, s.nextEventID, nil, + metrics.NoopMetricsHandler, ) historyBuilder.dbBufferBatch = nil historyBuilder.memEventsBatches = nil diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index eaa3eb02aee..a817612a23d 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -265,6 +265,7 @@ func NewMutableState( s.currentVersion, common.FirstEventID, s.bufferEventsInDB, + s.metricsHandler, ) s.taskGenerator = taskGeneratorProvider.NewTaskGenerator(shard, s) s.workflowTaskManager = newWorkflowTaskStateMachine(s) @@ -326,6 +327,7 @@ func newMutableStateFromDB( common.EmptyVersion, dbRecord.NextEventId, dbRecord.BufferedEvents, + mutableState.metricsHandler, ) mutableState.currentVersion = common.EmptyVersion @@ -527,6 +529,7 @@ func (ms *MutableStateImpl) UpdateCurrentVersion( ms.currentVersion, ms.nextEventIDInDB, ms.bufferEventsInDB, + ms.metricsHandler, ) return nil @@ -4140,6 +4143,7 @@ func (ms *MutableStateImpl) cleanupTransaction( ms.GetCurrentVersion(), ms.nextEventIDInDB, ms.bufferEventsInDB, + ms.metricsHandler, ) ms.InsertTasks = make(map[tasks.Category][]tasks.Task)