Skip to content

Commit

Permalink
Emit inordered buffered events metric
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Feb 14, 2023
1 parent 4b78d28 commit 4a6b59c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 0 deletions.
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,7 @@ var (
ShardLockLatency = NewTimerDef("shard_lock_latency")
NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency")
ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter")
InorderBufferedEventsCounter = NewCounterDef("inordered_buffered_events")

// Matching
MatchingClientForwardedCounter = NewCounterDef("forwarded")
Expand Down
50 changes: 50 additions & 0 deletions service/history/workflow/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
)
Expand Down Expand Up @@ -90,6 +91,8 @@ type (

// scheduled to started event ID mapping
scheduledIDToStartedID map[int64]int64

metricsHandler metrics.Handler
}
)

Expand All @@ -99,6 +102,7 @@ func NewMutableHistoryBuilder(
version int64,
nextEventID int64,
dbBufferBatch []*historypb.HistoryEvent,
metricsHandler metrics.Handler,
) *HistoryBuilder {
return &HistoryBuilder{
state: HistoryBuilderStateMutable,
Expand All @@ -116,6 +120,8 @@ func NewMutableHistoryBuilder(
memLatestBatch: nil,
memBufferBatch: nil,
scheduledIDToStartedID: make(map[int64]int64),

metricsHandler: metricsHandler,
}
}

Expand All @@ -139,6 +145,8 @@ func NewImmutableHistoryBuilder(
memLatestBatch: history,
memBufferBatch: nil,
scheduledIDToStartedID: nil,

metricsHandler: nil,
}
}

Expand Down Expand Up @@ -1401,6 +1409,7 @@ func (b *HistoryBuilder) wireEventIDs(
func (b *HistoryBuilder) reorderBuffer(
bufferEvents []*historypb.HistoryEvent,
) []*historypb.HistoryEvent {
b.emitInorderedBufferedEvents(bufferEvents)
reorderBuffer := make([]*historypb.HistoryEvent, 0, len(bufferEvents))
reorderEvents := make([]*historypb.HistoryEvent, 0, len(bufferEvents))
for _, event := range bufferEvents {
Expand All @@ -1423,6 +1432,47 @@ func (b *HistoryBuilder) reorderBuffer(
return append(reorderEvents, reorderBuffer...)
}

func (b *HistoryBuilder) emitInorderedBufferedEvents(bufferedEvents []*historypb.HistoryEvent) {
completedActivities := make(map[int64]struct{})
completedChildWorkflows := make(map[int64]struct{})
var inorderedEventsCount int64
for _, event := range bufferedEvents {
switch event.GetEventType() {
case enumspb.EVENT_TYPE_ACTIVITY_TASK_STARTED:
if _, seenCompleted := completedActivities[event.GetEventId()]; seenCompleted {
inorderedEventsCount++
}
case enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
completedActivities[event.GetActivityTaskCompletedEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED:
completedActivities[event.GetActivityTaskFailedEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
completedActivities[event.GetActivityTaskTimedOutEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCELED:
completedActivities[event.GetActivityTaskCanceledEventAttributes().GetStartedEventId()] = struct{}{}

case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
if _, seenCompleted := completedChildWorkflows[event.GetEventId()]; seenCompleted {
inorderedEventsCount++
}
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
completedChildWorkflows[event.GetChildWorkflowExecutionCompletedEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
completedChildWorkflows[event.GetChildWorkflowExecutionFailedEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
completedChildWorkflows[event.GetChildWorkflowExecutionTimedOutEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
completedChildWorkflows[event.GetChildWorkflowExecutionCanceledEventAttributes().GetStartedEventId()] = struct{}{}
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
completedChildWorkflows[event.GetChildWorkflowExecutionTerminatedEventAttributes().GetStartedEventId()] = struct{}{}
}
}

if inorderedEventsCount > 0 && b.metricsHandler != nil {
b.metricsHandler.Counter(metrics.InorderBufferedEventsCounter.GetMetricName()).Record(inorderedEventsCount)
}
}

func (b *HistoryBuilder) HasActivityFinishEvent(
scheduledEventID int64,
) bool {
Expand Down
4 changes: 4 additions & 0 deletions service/history/workflow/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/tests"
Expand Down Expand Up @@ -154,6 +155,7 @@ func (s *historyBuilderSuite) SetupTest() {
s.version,
s.nextEventID,
nil,
metrics.NoopMetricsHandler,
)
}

Expand Down Expand Up @@ -2088,6 +2090,7 @@ func (s *historyBuilderSuite) testWireEventIDs(
s.version,
s.nextEventID,
nil,
metrics.NoopMetricsHandler,
)
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{startEvent}
s.historyBuilder.memEventsBatches = nil
Expand Down Expand Up @@ -2134,6 +2137,7 @@ func (s *historyBuilderSuite) TestHasBufferEvent() {
s.version,
s.nextEventID,
nil,
metrics.NoopMetricsHandler,
)
historyBuilder.dbBufferBatch = nil
historyBuilder.memEventsBatches = nil
Expand Down
4 changes: 4 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func NewMutableState(
s.currentVersion,
common.FirstEventID,
s.bufferEventsInDB,
s.metricsHandler,
)
s.taskGenerator = taskGeneratorProvider.NewTaskGenerator(shard, s)
s.workflowTaskManager = newWorkflowTaskStateMachine(s)
Expand Down Expand Up @@ -326,6 +327,7 @@ func newMutableStateFromDB(
common.EmptyVersion,
dbRecord.NextEventId,
dbRecord.BufferedEvents,
mutableState.metricsHandler,
)

mutableState.currentVersion = common.EmptyVersion
Expand Down Expand Up @@ -527,6 +529,7 @@ func (ms *MutableStateImpl) UpdateCurrentVersion(
ms.currentVersion,
ms.nextEventIDInDB,
ms.bufferEventsInDB,
ms.metricsHandler,
)

return nil
Expand Down Expand Up @@ -4140,6 +4143,7 @@ func (ms *MutableStateImpl) cleanupTransaction(
ms.GetCurrentVersion(),
ms.nextEventIDInDB,
ms.bufferEventsInDB,
ms.metricsHandler,
)

ms.InsertTasks = make(map[tasks.Category][]tasks.Task)
Expand Down

0 comments on commit 4a6b59c

Please sign in to comment.