Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track and emit metrics for entities in a single workflow #4065

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 263 additions & 48 deletions api/checksum/v1/message.pb.go

Large diffs are not rendered by default.

663 changes: 439 additions & 224 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,14 +1403,22 @@ var (
ChildInfoSize = NewBytesHistogramDef("child_info_size")
RequestCancelInfoSize = NewBytesHistogramDef("request_cancel_info_size")
SignalInfoSize = NewBytesHistogramDef("signal_info_size")
SignalRequestIDSize = NewBytesHistogramDef("signal_request_id_size")
BufferedEventsSize = NewBytesHistogramDef("buffered_events_size")
ActivityInfoCount = NewDimensionlessHistogramDef("activity_info_count")
TimerInfoCount = NewDimensionlessHistogramDef("timer_info_count")
ChildInfoCount = NewDimensionlessHistogramDef("child_info_count")
SignalInfoCount = NewDimensionlessHistogramDef("signal_info_count")
RequestCancelInfoCount = NewDimensionlessHistogramDef("request_cancel_info_count")
SignalRequestIDCount = NewDimensionlessHistogramDef("signal_request_id_count")
BufferedEventsCount = NewDimensionlessHistogramDef("buffered_events_count")
TaskCount = NewDimensionlessHistogramDef("task_count")
TotalActivityCount = NewDimensionlessHistogramDef("total_activity_count")
TotalUserTimerCount = NewDimensionlessHistogramDef("total_user_timer_count")
TotalChildExecutionCount = NewDimensionlessHistogramDef("total_child_execution_count")
TotalRequestCancelExternalCount = NewDimensionlessHistogramDef("total_request_cancel_external_count")
TotalSignalExternalCount = NewDimensionlessHistogramDef("total_signal_external_count")
TotalSignalCount = NewDimensionlessHistogramDef("total_signal_count")
WorkflowRetryBackoffTimerCount = NewCounterDef("workflow_retry_backoff_timer")
WorkflowCronBackoffTimerCount = NewCounterDef("workflow_cron_backoff_timer")
WorkflowCleanupDeleteCount = NewCounterDef("workflow_cleanup_delete")
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,14 @@ type (
SignalRequestIDCount int
BufferedEventsCount int
TaskCountByCategory map[string]int

// Total item count for various information captured within mutable state
TotalActivityCount int64
TotalUserTimerCount int64
TotalChildExecutionCount int64
TotalRequestCancelExternalCount int64
TotalSignalExternalCount int64
TotalSignalCount int64
}

HistoryStatistics struct {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
newResponse := &GetWorkflowExecutionResponse{
State: state,
DBRecordVersion: response.DBRecordVersion,
MutableStateStats: *statusOfInternalWorkflow(response.State, nil),
MutableStateStats: *statusOfInternalWorkflow(response.State, state, nil),
}
return newResponse, nil
}
Expand Down
138 changes: 89 additions & 49 deletions common/persistence/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,49 @@

package persistence

import "go.temporal.io/server/service/history/tasks"
import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/service/history/tasks"
)

func statusOfInternalWorkflow(
state *InternalWorkflowMutableState,
internalState *InternalWorkflowMutableState,
state *persistencespb.WorkflowMutableState,
historyStatistics *HistoryStatistics,
) *MutableStateStatistics {
if state == nil {
if internalState == nil {
return nil
}

executionInfoSize := sizeOfBlob(state.ExecutionInfo)
executionStateSize := sizeOfBlob(state.ExecutionState)
executionInfoSize := sizeOfBlob(internalState.ExecutionInfo)
executionStateSize := sizeOfBlob(internalState.ExecutionState)

activityInfoCount := len(state.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(state.ActivityInfos)
totalActivityCount := state.ExecutionInfo.ActivityCount
activityInfoCount := len(internalState.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(internalState.ActivityInfos)

timerInfoCount := len(state.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(state.TimerInfos)
totalUserTimerCount := state.ExecutionInfo.UserTimerCount
timerInfoCount := len(internalState.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(internalState.TimerInfos)

childExecutionInfoCount := len(state.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(state.ChildExecutionInfos)
totalChildExecutionCount := state.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(internalState.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(internalState.ChildExecutionInfos)

requestCancelInfoCount := len(state.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(state.RequestCancelInfos)
totalRequestCancelExternalCount := state.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(internalState.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(internalState.RequestCancelInfos)

signalInfoCount := len(state.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(state.SignalInfos)
totalSignalExternalCount := state.ExecutionInfo.SignalExternalCount
signalInfoCount := len(internalState.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(internalState.SignalInfos)

signalRequestIDCount := len(state.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSlice(state.SignalRequestedIDs)
totalSignalCount := state.ExecutionInfo.SignalCount
signalRequestIDCount := len(internalState.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSlice(internalState.SignalRequestedIDs)

bufferedEventsCount := len(state.BufferedEvents)
bufferedEventsSize := sizeOfBlobSlice(state.BufferedEvents)
bufferedEventsCount := len(internalState.BufferedEvents)
bufferedEventsSize := sizeOfBlobSlice(internalState.BufferedEvents)

totalSize := executionInfoSize
totalSize += executionStateSize
Expand All @@ -75,23 +85,29 @@ func statusOfInternalWorkflow(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand All @@ -109,31 +125,37 @@ func statusOfInternalWorkflowMutation(
executionInfoSize := sizeOfBlob(mutation.ExecutionInfoBlob)
executionStateSize := sizeOfBlob(mutation.ExecutionStateBlob)

totalActivityCount := mutation.ExecutionInfo.ActivityCount
activityInfoCount := len(mutation.UpsertActivityInfos)
activityInfoCount += len(mutation.DeleteActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(mutation.UpsertActivityInfos)
activityInfoSize += sizeOfInt64Set(mutation.DeleteActivityInfos)

totalUserTimerCount := mutation.ExecutionInfo.UserTimerCount
timerInfoCount := len(mutation.UpsertTimerInfos)
timerInfoCount += len(mutation.DeleteTimerInfos)
timerInfoSize := sizeOfStringBlobMap(mutation.UpsertTimerInfos)
timerInfoSize += sizeOfStringSet(mutation.DeleteTimerInfos)

totalChildExecutionCount := mutation.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(mutation.UpsertChildExecutionInfos)
childExecutionInfoCount += len(mutation.DeleteChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(mutation.UpsertChildExecutionInfos)
childExecutionInfoSize += sizeOfInt64Set(mutation.DeleteChildExecutionInfos)

totalRequestCancelExternalCount := mutation.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(mutation.UpsertRequestCancelInfos)
requestCancelInfoCount += len(mutation.DeleteRequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(mutation.UpsertRequestCancelInfos)
requestCancelInfoSize += sizeOfInt64Set(mutation.DeleteRequestCancelInfos)

totalSignalExternalCount := mutation.ExecutionInfo.SignalExternalCount
signalInfoCount := len(mutation.UpsertSignalInfos)
signalInfoCount += len(mutation.DeleteSignalInfos)
signalInfoSize := sizeOfInt64BlobMap(mutation.UpsertSignalInfos)
signalInfoSize += sizeOfInt64Set(mutation.DeleteSignalInfos)

totalSignalCount := mutation.ExecutionInfo.SignalCount
signalRequestIDCount := len(mutation.UpsertSignalRequestedIDs)
signalRequestIDCount += len(mutation.DeleteSignalRequestedIDs)
signalRequestIDSize := sizeOfStringSet(mutation.UpsertSignalRequestedIDs)
Expand Down Expand Up @@ -167,23 +189,29 @@ func statusOfInternalWorkflowMutation(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand Down Expand Up @@ -211,21 +239,27 @@ func statusOfInternalWorkflowSnapshot(
executionInfoSize := sizeOfBlob(snapshot.ExecutionInfoBlob)
executionStateSize := sizeOfBlob(snapshot.ExecutionStateBlob)

totalActivityCount := snapshot.ExecutionInfo.ActivityCount
activityInfoCount := len(snapshot.ActivityInfos)
activityInfoSize := sizeOfInt64BlobMap(snapshot.ActivityInfos)

totalUserTimerCount := snapshot.ExecutionInfo.UserTimerCount
timerInfoCount := len(snapshot.TimerInfos)
timerInfoSize := sizeOfStringBlobMap(snapshot.TimerInfos)

totalChildExecutionCount := snapshot.ExecutionInfo.ChildExecutionCount
childExecutionInfoCount := len(snapshot.ChildExecutionInfos)
childExecutionInfoSize := sizeOfInt64BlobMap(snapshot.ChildExecutionInfos)

totalRequestCancelExternalCount := snapshot.ExecutionInfo.RequestCancelExternalCount
requestCancelInfoCount := len(snapshot.RequestCancelInfos)
requestCancelInfoSize := sizeOfInt64BlobMap(snapshot.RequestCancelInfos)

totalSignalExternalCount := snapshot.ExecutionInfo.SignalExternalCount
signalInfoCount := len(snapshot.SignalInfos)
signalInfoSize := sizeOfInt64BlobMap(snapshot.SignalInfos)

totalSignalCount := snapshot.ExecutionInfo.SignalCount
signalRequestIDCount := len(snapshot.SignalRequestedIDs)
signalRequestIDSize := sizeOfStringSet(snapshot.SignalRequestedIDs)

Expand All @@ -251,23 +285,29 @@ func statusOfInternalWorkflowSnapshot(
ExecutionInfoSize: executionInfoSize,
ExecutionStateSize: executionStateSize,

ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
ActivityInfoSize: activityInfoSize,
ActivityInfoCount: activityInfoCount,
TotalActivityCount: totalActivityCount,

TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TimerInfoSize: timerInfoSize,
TimerInfoCount: timerInfoCount,
TotalUserTimerCount: totalUserTimerCount,

ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
ChildInfoSize: childExecutionInfoSize,
ChildInfoCount: childExecutionInfoCount,
TotalChildExecutionCount: totalChildExecutionCount,

RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
RequestCancelInfoSize: requestCancelInfoSize,
RequestCancelInfoCount: requestCancelInfoCount,
TotalRequestCancelExternalCount: totalRequestCancelExternalCount,

SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
SignalInfoSize: signalInfoSize,
SignalInfoCount: signalInfoCount,
TotalSignalExternalCount: totalSignalExternalCount,

SignalRequestIDSize: signalRequestIDSize,
SignalRequestIDCount: signalRequestIDCount,
TotalSignalCount: totalSignalCount,

BufferedEventsSize: bufferedEventsSize,
BufferedEventsCount: bufferedEventsCount,
Expand Down
7 changes: 7 additions & 0 deletions proto/internal/temporal/server/api/checksum/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ message MutableStateChecksumPayload {
int64 last_first_event_id = 6;
int64 next_event_id = 7;
int64 last_processed_event_id = 8;

int64 signal_count = 9;
int64 activity_count = 21;
int64 child_execution_count = 22;
int64 user_timer_count = 23;
int64 request_cancel_external_count = 24;
int64 signal_external_count = 25;

int32 workflow_task_attempt = 10;
int64 workflow_task_version = 11;
Expand All @@ -54,4 +60,5 @@ message MutableStateChecksumPayload {

string sticky_task_queue_name = 19;
temporal.server.api.history.v1.VersionHistories version_histories = 20;

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ message WorkflowExecutionInfo {
reserved 44;
reserved 45;
int64 signal_count = 46;
int64 activity_count = 71;
int64 child_execution_count = 72;
int64 user_timer_count = 73;
int64 request_cancel_external_count = 74;
int64 signal_external_count = 75;
reserved 47;
reserved 48;
reserved 49;
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func newMutableStateChecksumPayload(ms MutableState) *checksumspb.MutableStateCh
LastFirstEventId: executionInfo.LastFirstEventId,
NextEventId: ms.GetNextEventID(),
LastProcessedEventId: executionInfo.LastWorkflowTaskStartedEventId,
ActivityCount: executionInfo.ActivityCount,
ChildExecutionCount: executionInfo.ChildExecutionCount,
UserTimerCount: executionInfo.UserTimerCount,
RequestCancelExternalCount: executionInfo.RequestCancelExternalCount,
SignalExternalCount: executionInfo.SignalExternalCount,
SignalCount: executionInfo.SignalCount,
WorkflowTaskAttempt: executionInfo.WorkflowTaskAttempt,
WorkflowTaskScheduledEventId: executionInfo.WorkflowTaskScheduledEventId,
Expand Down
8 changes: 8 additions & 0 deletions service/history/workflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,22 @@ func emitMutableStateStatus(
metricsHandler.Histogram(metrics.ExecutionStateSize.GetMetricName(), metrics.ExecutionStateSize.GetMetricUnit()).Record(int64(stats.ExecutionStateSize))
metricsHandler.Histogram(metrics.ActivityInfoSize.GetMetricName(), metrics.ActivityInfoSize.GetMetricUnit()).Record(int64(stats.ActivityInfoSize))
metricsHandler.Histogram(metrics.ActivityInfoCount.GetMetricName(), metrics.ActivityInfoCount.GetMetricUnit()).Record(int64(stats.ActivityInfoCount))
metricsHandler.Histogram(metrics.TotalActivityCount.GetMetricName(), metrics.TotalActivityCount.GetMetricUnit()).Record(stats.TotalActivityCount)
metricsHandler.Histogram(metrics.TimerInfoSize.GetMetricName(), metrics.TimerInfoSize.GetMetricUnit()).Record(int64(stats.TimerInfoSize))
metricsHandler.Histogram(metrics.TimerInfoCount.GetMetricName(), metrics.TimerInfoCount.GetMetricUnit()).Record(int64(stats.TimerInfoCount))
metricsHandler.Histogram(metrics.TotalUserTimerCount.GetMetricName(), metrics.TotalUserTimerCount.GetMetricUnit()).Record(stats.TotalUserTimerCount)
metricsHandler.Histogram(metrics.ChildInfoSize.GetMetricName(), metrics.ChildInfoSize.GetMetricUnit()).Record(int64(stats.ChildInfoSize))
metricsHandler.Histogram(metrics.ChildInfoCount.GetMetricName(), metrics.ChildInfoCount.GetMetricUnit()).Record(int64(stats.ChildInfoCount))
metricsHandler.Histogram(metrics.TotalChildExecutionCount.GetMetricName(), metrics.TotalChildExecutionCount.GetMetricUnit()).Record(stats.TotalChildExecutionCount)
metricsHandler.Histogram(metrics.RequestCancelInfoSize.GetMetricName(), metrics.RequestCancelInfoSize.GetMetricUnit()).Record(int64(stats.RequestCancelInfoSize))
metricsHandler.Histogram(metrics.RequestCancelInfoCount.GetMetricName(), metrics.RequestCancelInfoCount.GetMetricUnit()).Record(int64(stats.RequestCancelInfoCount))
metricsHandler.Histogram(metrics.TotalRequestCancelExternalCount.GetMetricName(), metrics.TotalRequestCancelExternalCount.GetMetricUnit()).Record(stats.TotalRequestCancelExternalCount)
metricsHandler.Histogram(metrics.SignalInfoSize.GetMetricName(), metrics.SignalInfoSize.GetMetricUnit()).Record(int64(stats.SignalInfoSize))
metricsHandler.Histogram(metrics.SignalInfoCount.GetMetricName(), metrics.SignalInfoCount.GetMetricUnit()).Record(int64(stats.SignalInfoCount))
metricsHandler.Histogram(metrics.TotalSignalExternalCount.GetMetricName(), metrics.TotalSignalExternalCount.GetMetricUnit()).Record(stats.TotalSignalExternalCount)
metricsHandler.Histogram(metrics.SignalRequestIDSize.GetMetricName(), metrics.SignalRequestIDSize.GetMetricUnit()).Record(int64(stats.SignalRequestIDSize))
metricsHandler.Histogram(metrics.SignalRequestIDCount.GetMetricName(), metrics.SignalRequestIDCount.GetMetricUnit()).Record(int64(stats.SignalRequestIDCount))
metricsHandler.Histogram(metrics.TotalSignalCount.GetMetricName(), metrics.TotalSignalCount.GetMetricUnit()).Record(stats.TotalSignalCount)
metricsHandler.Histogram(metrics.BufferedEventsSize.GetMetricName(), metrics.BufferedEventsSize.GetMetricUnit()).Record(int64(stats.BufferedEventsSize))
metricsHandler.Histogram(metrics.BufferedEventsCount.GetMetricName(), metrics.BufferedEventsCount.GetMetricUnit()).Record(int64(stats.BufferedEventsCount))

Expand Down
Loading