Skip to content

Commit

Permalink
Track and emit metrics for entities in a single workflow (#4065)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 24, 2023
1 parent 145c99b commit a8b591d
Show file tree
Hide file tree
Showing 12 changed files with 916 additions and 322 deletions.
311 changes: 263 additions & 48 deletions api/checksum/v1/message.pb.go

Large diffs are not rendered by default.

663 changes: 439 additions & 224 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,14 +1413,22 @@ var (
ChildInfoSize = NewBytesHistogramDef("child_info_size")
RequestCancelInfoSize = NewBytesHistogramDef("request_cancel_info_size")
SignalInfoSize = NewBytesHistogramDef("signal_info_size")
SignalRequestIDSize = NewBytesHistogramDef("signal_request_id_size")
BufferedEventsSize = NewBytesHistogramDef("buffered_events_size")
ActivityInfoCount = NewDimensionlessHistogramDef("activity_info_count")
TimerInfoCount = NewDimensionlessHistogramDef("timer_info_count")
ChildInfoCount = NewDimensionlessHistogramDef("child_info_count")
SignalInfoCount = NewDimensionlessHistogramDef("signal_info_count")
RequestCancelInfoCount = NewDimensionlessHistogramDef("request_cancel_info_count")
SignalRequestIDCount = NewDimensionlessHistogramDef("signal_request_id_count")
BufferedEventsCount = NewDimensionlessHistogramDef("buffered_events_count")
TaskCount = NewDimensionlessHistogramDef("task_count")
TotalActivityCount = NewDimensionlessHistogramDef("total_activity_count")
TotalUserTimerCount = NewDimensionlessHistogramDef("total_user_timer_count")
TotalChildExecutionCount = NewDimensionlessHistogramDef("total_child_execution_count")
TotalRequestCancelExternalCount = NewDimensionlessHistogramDef("total_request_cancel_external_count")
TotalSignalExternalCount = NewDimensionlessHistogramDef("total_signal_external_count")
TotalSignalCount = NewDimensionlessHistogramDef("total_signal_count")
WorkflowRetryBackoffTimerCount = NewCounterDef("workflow_retry_backoff_timer")
WorkflowCronBackoffTimerCount = NewCounterDef("workflow_cron_backoff_timer")
WorkflowCleanupDeleteCount = NewCounterDef("workflow_cleanup_delete")
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ type (
SignalRequestIDCount int
BufferedEventsCount int
TaskCountByCategory map[string]int

// Total item count for various information captured within mutable state
TotalActivityCount int64
TotalUserTimerCount int64
TotalChildExecutionCount int64
TotalRequestCancelExternalCount int64
TotalSignalExternalCount int64
TotalSignalCount int64
}

HistoryStatistics struct {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
newResponse := &GetWorkflowExecutionResponse{
State: state,
DBRecordVersion: response.DBRecordVersion,
MutableStateStats: *statusOfInternalWorkflow(response.State, nil),
MutableStateStats: *statusOfInternalWorkflow(response.State, state, nil),
}
return newResponse, nil
}
Expand Down
138 changes: 89 additions & 49 deletions common/persistence/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,49 @@

package persistence

import "go.temporal.io/server/service/history/tasks"
import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/service/history/tasks"
)

func statusOfInternalWorkflow(
state *InternalWorkflowMutableState,
internalState *InternalWorkflowMutableState,
state *persistencespb.WorkflowMutableState,
historyStatistics *HistoryStatistics,
) *MutableStateStatistics {
if state == nil {
if internalState == nil {
return nil
}

executionInfoSize := sizeOfBlob(state.ExecutionInfo)
executionStateSize := sizeOfBlob(state.ExecutionState)
executionInfoSize := sizeOfBlob(internalState.ExecutionInfo)
executionStateSize := sizeOfBlob(internalState.ExecutionState)

activityInfoCount := len(state.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(state.ActivityInfos)
totalActivityCount := state.ExecutionInfo.ActivityCount
activityInfoCount := len(internalState.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(internalState.ActivityInfos)

timerInfoCount := len(state.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(state.TimerInfos)
totalUserTimerCount := state.ExecutionInfo.UserTimerCount
timerInfoCount := len(internalState.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(internalState.TimerInfos)

childExecutionInfoCount := len(state.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(state.ChildExecutionInfos)
totalChildExecutionCount := state.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(internalState.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(internalState.ChildExecutionInfos)

requestCancelInfoCount := len(state.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(state.RequestCancelInfos)
totalRequestCancelExternalCount := state.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(internalState.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(internalState.RequestCancelInfos)

signalInfoCount := len(state.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(state.SignalInfos)
totalSignalExternalCount := state.ExecutionInfo.SignalExternalCount
signalInfoCount := len(internalState.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(internalState.SignalInfos)

signalRequestIDCount := len(state.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSlice(state.SignalRequestedIDs)
totalSignalCount := state.ExecutionInfo.SignalCount
signalRequestIDCount := len(internalState.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSlice(internalState.SignalRequestedIDs)

bufferedEventsCount := len(state.BufferedEvents)
bufferedEventsSize := sizeOfBlobSlice(state.BufferedEvents)
bufferedEventsCount := len(internalState.BufferedEvents)
bufferedEventsSize := sizeOfBlobSlice(internalState.BufferedEvents)

totalSize := executionInfoSize
totalSize += executionStateSize
Expand All @@ -75,23 +85,29 @@ func statusOfInternalWorkflow(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand All @@ -109,31 +125,37 @@ func statusOfInternalWorkflowMutation(
executionInfoSize := sizeOfBlob(mutation.ExecutionInfoBlob)
executionStateSize := sizeOfBlob(mutation.ExecutionStateBlob)

totalActivityCount := mutation.ExecutionInfo.ActivityCount
activityInfoCount := len(mutation.UpsertActivityInfos)
activityInfoCount += len(mutation.DeleteActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(mutation.UpsertActivityInfos)
activityInfoSize += sizeOfInt64Set(mutation.DeleteActivityInfos)

totalUserTimerCount := mutation.ExecutionInfo.UserTimerCount
timerInfoCount := len(mutation.UpsertTimerInfos)
timerInfoCount += len(mutation.DeleteTimerInfos)
timerInfoSize := sizeOfStringBlobMap(mutation.UpsertTimerInfos)
timerInfoSize += sizeOfStringSet(mutation.DeleteTimerInfos)

totalChildExecutionCount := mutation.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(mutation.UpsertChildExecutionInfos)
childExecutionInfoCount += len(mutation.DeleteChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(mutation.UpsertChildExecutionInfos)
childExecutionInfoSize += sizeOfInt64Set(mutation.DeleteChildExecutionInfos)

totalRequestCancelExternalCount := mutation.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(mutation.UpsertRequestCancelInfos)
requestCancelInfoCount += len(mutation.DeleteRequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(mutation.UpsertRequestCancelInfos)
requestCancelInfoSize += sizeOfInt64Set(mutation.DeleteRequestCancelInfos)

totalSignalExternalCount := mutation.ExecutionInfo.SignalExternalCount
signalInfoCount := len(mutation.UpsertSignalInfos)
signalInfoCount += len(mutation.DeleteSignalInfos)
signalInfoSize := sizeOfInt64BlobMap(mutation.UpsertSignalInfos)
signalInfoSize += sizeOfInt64Set(mutation.DeleteSignalInfos)

totalSignalCount := mutation.ExecutionInfo.SignalCount
signalRequestIDCount := len(mutation.UpsertSignalRequestedIDs)
signalRequestIDCount += len(mutation.DeleteSignalRequestedIDs)
signalRequestIDSize := sizeOfStringSet(mutation.UpsertSignalRequestedIDs)
Expand Down Expand Up @@ -167,23 +189,29 @@ func statusOfInternalWorkflowMutation(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand Down Expand Up @@ -211,21 +239,27 @@ func statusOfInternalWorkflowSnapshot(
executionInfoSize := sizeOfBlob(snapshot.ExecutionInfoBlob)
executionStateSize := sizeOfBlob(snapshot.ExecutionStateBlob)

totalActivityCount := snapshot.ExecutionInfo.ActivityCount
activityInfoCount := len(snapshot.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(snapshot.ActivityInfos)

totalUserTimerCount := snapshot.ExecutionInfo.UserTimerCount
timerInfoCount := len(snapshot.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(snapshot.TimerInfos)

totalChildExecutionCount := snapshot.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(snapshot.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(snapshot.ChildExecutionInfos)

totalRequestCancelExternalCount := snapshot.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(snapshot.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(snapshot.RequestCancelInfos)

totalSignalExternalCount := snapshot.ExecutionInfo.SignalExternalCount
signalInfoCount := len(snapshot.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(snapshot.SignalInfos)

totalSignalCount := snapshot.ExecutionInfo.SignalCount
signalRequestIDCount := len(snapshot.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSet(snapshot.SignalRequestedIDs)

Expand All @@ -251,23 +285,29 @@ func statusOfInternalWorkflowSnapshot(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand Down
7 changes: 7 additions & 0 deletions proto/internal/temporal/server/api/checksum/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ message MutableStateChecksumPayload {
int64 last_first_event_id = 6;
int64 next_event_id = 7;
int64 last_processed_event_id = 8;

int64 signal_count = 9;
int64 activity_count = 21;
int64 child_execution_count = 22;
int64 user_timer_count = 23;
int64 request_cancel_external_count = 24;
int64 signal_external_count = 25;

int32 workflow_task_attempt = 10;
int64 workflow_task_version = 11;
Expand All @@ -54,4 +60,5 @@ message MutableStateChecksumPayload {

string sticky_task_queue_name = 19;
temporal.server.api.history.v1.VersionHistories version_histories = 20;

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ message WorkflowExecutionInfo {
reserved 44;
reserved 45;
int64 signal_count = 46;
int64 activity_count = 71;
int64 child_execution_count = 72;
int64 user_timer_count = 73;
int64 request_cancel_external_count = 74;
int64 signal_external_count = 75;
reserved 47;
reserved 48;
reserved 49;
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func newMutableStateChecksumPayload(ms MutableState) *checksumspb.MutableStateCh
LastFirstEventId: executionInfo.LastFirstEventId,
NextEventId: ms.GetNextEventID(),
LastProcessedEventId: executionInfo.LastWorkflowTaskStartedEventId,
ActivityCount: executionInfo.ActivityCount,
ChildExecutionCount: executionInfo.ChildExecutionCount,
UserTimerCount: executionInfo.UserTimerCount,
RequestCancelExternalCount: executionInfo.RequestCancelExternalCount,
SignalExternalCount: executionInfo.SignalExternalCount,
SignalCount: executionInfo.SignalCount,
WorkflowTaskAttempt: executionInfo.WorkflowTaskAttempt,
WorkflowTaskScheduledEventId: executionInfo.WorkflowTaskScheduledEventId,
Expand Down
8 changes: 8 additions & 0 deletions service/history/workflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,22 @@ func emitMutableStateStatus(
metricsHandler.Histogram(metrics.ExecutionStateSize.GetMetricName(), metrics.ExecutionStateSize.GetMetricUnit()).Record(int64(stats.ExecutionStateSize))
metricsHandler.Histogram(metrics.ActivityInfoSize.GetMetricName(), metrics.ActivityInfoSize.GetMetricUnit()).Record(int64(stats.ActivityInfoSize))
metricsHandler.Histogram(metrics.ActivityInfoCount.GetMetricName(), metrics.ActivityInfoCount.GetMetricUnit()).Record(int64(stats.ActivityInfoCount))
metricsHandler.Histogram(metrics.TotalActivityCount.GetMetricName(), metrics.TotalActivityCount.GetMetricUnit()).Record(stats.TotalActivityCount)
metricsHandler.Histogram(metrics.TimerInfoSize.GetMetricName(), metrics.TimerInfoSize.GetMetricUnit()).Record(int64(stats.TimerInfoSize))
metricsHandler.Histogram(metrics.TimerInfoCount.GetMetricName(), metrics.TimerInfoCount.GetMetricUnit()).Record(int64(stats.TimerInfoCount))
metricsHandler.Histogram(metrics.TotalUserTimerCount.GetMetricName(), metrics.TotalUserTimerCount.GetMetricUnit()).Record(stats.TotalUserTimerCount)
metricsHandler.Histogram(metrics.ChildInfoSize.GetMetricName(), metrics.ChildInfoSize.GetMetricUnit()).Record(int64(stats.ChildInfoSize))
metricsHandler.Histogram(metrics.ChildInfoCount.GetMetricName(), metrics.ChildInfoCount.GetMetricUnit()).Record(int64(stats.ChildInfoCount))
metricsHandler.Histogram(metrics.TotalChildExecutionCount.GetMetricName(), metrics.TotalChildExecutionCount.GetMetricUnit()).Record(stats.TotalChildExecutionCount)
metricsHandler.Histogram(metrics.RequestCancelInfoSize.GetMetricName(), metrics.RequestCancelInfoSize.GetMetricUnit()).Record(int64(stats.RequestCancelInfoSize))
metricsHandler.Histogram(metrics.RequestCancelInfoCount.GetMetricName(), metrics.RequestCancelInfoCount.GetMetricUnit()).Record(int64(stats.RequestCancelInfoCount))
metricsHandler.Histogram(metrics.TotalRequestCancelExternalCount.GetMetricName(), metrics.TotalRequestCancelExternalCount.GetMetricUnit()).Record(stats.TotalRequestCancelExternalCount)
metricsHandler.Histogram(metrics.SignalInfoSize.GetMetricName(), metrics.SignalInfoSize.GetMetricUnit()).Record(int64(stats.SignalInfoSize))
metricsHandler.Histogram(metrics.SignalInfoCount.GetMetricName(), metrics.SignalInfoCount.GetMetricUnit()).Record(int64(stats.SignalInfoCount))
metricsHandler.Histogram(metrics.TotalSignalExternalCount.GetMetricName(), metrics.TotalSignalExternalCount.GetMetricUnit()).Record(stats.TotalSignalExternalCount)
metricsHandler.Histogram(metrics.SignalRequestIDSize.GetMetricName(), metrics.SignalRequestIDSize.GetMetricUnit()).Record(int64(stats.SignalRequestIDSize))
metricsHandler.Histogram(metrics.SignalRequestIDCount.GetMetricName(), metrics.SignalRequestIDCount.GetMetricUnit()).Record(int64(stats.SignalRequestIDCount))
metricsHandler.Histogram(metrics.TotalSignalCount.GetMetricName(), metrics.TotalSignalCount.GetMetricUnit()).Record(stats.TotalSignalCount)
metricsHandler.Histogram(metrics.BufferedEventsSize.GetMetricName(), metrics.BufferedEventsSize.GetMetricUnit()).Record(int64(stats.BufferedEventsSize))
metricsHandler.Histogram(metrics.BufferedEventsCount.GetMetricName(), metrics.BufferedEventsCount.GetMetricUnit()).Record(int64(stats.BufferedEventsCount))

Expand Down
Loading

0 comments on commit a8b591d

Please sign in to comment.