diff --git a/common/aggregate/persistence_health_signal_aggregator.go b/common/aggregate/persistence_health_signal_aggregator.go index 939f58e4b1d..b21ff056cb4 100644 --- a/common/aggregate/persistence_health_signal_aggregator.go +++ b/common/aggregate/persistence_health_signal_aggregator.go @@ -29,12 +29,14 @@ import ( "time" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/quotas" ) type ( - PersistenceHealthSignalAggregator[K comparable] struct { + PersistenceHealthSignalAggregator[K SignalKey] struct { SignalAggregator[quotas.Request] + keyMapper SignalKeyMapperFn[quotas.Request, K] latencyAverages map[K]MovingWindowAverage @@ -45,6 +47,9 @@ type ( windowSize time.Duration maxBufferSize int + + metricsHandler metrics.Handler + emitMetricsTimer *time.Ticker } perShardPerNsHealthSignalKey struct { @@ -53,28 +58,35 @@ type ( } ) -func NewPersistenceHealthSignalAggregator[K comparable]( +func NewPersistenceHealthSignalAggregator[K SignalKey]( keyMapper SignalKeyMapperFn[quotas.Request, K], windowSize time.Duration, maxBufferSize int, + metricsHandler metrics.Handler, ) *PersistenceHealthSignalAggregator[K] { - return &PersistenceHealthSignalAggregator[K]{ - keyMapper: keyMapper, - latencyAverages: make(map[K]MovingWindowAverage), - errorRatios: make(map[K]MovingWindowAverage), - windowSize: windowSize, - maxBufferSize: maxBufferSize, + ret := &PersistenceHealthSignalAggregator[K]{ + keyMapper: keyMapper, + latencyAverages: make(map[K]MovingWindowAverage), + errorRatios: make(map[K]MovingWindowAverage), + windowSize: windowSize, + maxBufferSize: maxBufferSize, + metricsHandler: metricsHandler, + emitMetricsTimer: time.NewTicker(windowSize), } + go ret.emitMetricsLoop() + return ret } func NewPerShardPerNsHealthSignalAggregator( windowSize dynamicconfig.DurationPropertyFn, maxBufferSize dynamicconfig.IntPropertyFn, + metricsHandler metrics.Handler, ) *PersistenceHealthSignalAggregator[perShardPerNsHealthSignalKey] { return NewPersistenceHealthSignalAggregator[perShardPerNsHealthSignalKey]( perShardPerNsKeyMapperFn, windowSize(), maxBufferSize(), + metricsHandler, ) } @@ -85,6 +97,12 @@ func perShardPerNsKeyMapperFn(req quotas.Request) perShardPerNsHealthSignalKey { } } +func (k perShardPerNsHealthSignalKey) GetMetricTags() []metrics.Tag { + nsTag := metrics.NamespaceTag(k.namespace) + shardTag := metrics.ShardTag(k.shardID) + return []metrics.Tag{nsTag, shardTag} +} + func (s *PersistenceHealthSignalAggregator[_]) GetRecordFn(req quotas.Request) func(err error) { start := time.Now() return func(err error) { @@ -141,3 +159,17 @@ func (s *PersistenceHealthSignalAggregator[K]) getOrInitAverage( (*averages)[key] = newAvg return newAvg } + +func (s *PersistenceHealthSignalAggregator[_]) emitMetricsLoop() { + for { + select { + case <-s.emitMetricsTimer.C: + for key, avg := range s.latencyAverages { + s.metricsHandler.Gauge(metrics.PersistenceAvgLatencyPerShardPerNamespace.GetMetricName()).Record(avg.Average(), key.GetMetricTags()...) + } + for key, ratio := range s.errorRatios { + s.metricsHandler.Gauge(metrics.PersistenceErrPerShardPerNamespace.GetMetricName()).Record(ratio.Average(), key.GetMetricTags()...) + } + } + } +} diff --git a/common/aggregate/signal_aggregator.go b/common/aggregate/signal_aggregator.go index ede2166e5fa..83951cb75f5 100644 --- a/common/aggregate/signal_aggregator.go +++ b/common/aggregate/signal_aggregator.go @@ -24,12 +24,19 @@ package aggregate +import "go.temporal.io/server/common/metrics" + type ( - SignalKeyMapperFn[T any, K comparable] func(signalOrigin T) K + SignalKey interface { + comparable + GetMetricTags() []metrics.Tag + } + + SignalKeyMapperFn[T any, K SignalKey] func(origin T) K SignalAggregator[T any] interface { - GetRecordFn(key T) func(err error) - AverageLatency(key T) float64 - ErrorRatio(key T) float64 + GetRecordFn(origin T) func(err error) + AverageLatency(origin T) float64 + ErrorRatio(origin T) float64 } ) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 7b1715eab43..9f6025008b9 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1663,4 +1663,6 @@ var ( VisibilityPersistenceFailures = NewCounterDef("visibility_persistence_errors") VisibilityPersistenceResourceExhausted = NewCounterDef("visibility_persistence_resource_exhausted") VisibilityPersistenceLatency = NewTimerDef("visibility_persistence_latency") + PersistenceErrPerShardPerNamespace = NewDimensionlessHistogramDef("persistence_error_ratio_per_shard_per_ns") + PersistenceAvgLatencyPerShardPerNamespace = NewDimensionlessHistogramDef("persistence_average_latency_per_shard_per_namespace") ) diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 24f4d5ba64e..0b96544a6d3 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -44,6 +44,7 @@ const ( instance = "instance" namespace = "namespace" namespaceState = "namespace_state" + shard = "shard" targetCluster = "target_cluster" fromCluster = "from_cluster" toCluster = "to_cluster" @@ -131,6 +132,10 @@ func InstanceTag(value string) Tag { return &tagImpl{key: instance, value: value} } +func ShardTag(value int32) Tag { + return &tagImpl{key: shard, value: string(value)} +} + // TargetClusterTag returns a new target cluster tag. func TargetClusterTag(value string) Tag { if len(value) == 0 { diff --git a/common/resource/fx.go b/common/resource/fx.go index c160c29e175..f443864f4db 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -219,10 +219,12 @@ func NamespaceRegistryProvider( func PersistenceHealthSignalAggregatorProvider( dynamicCollection *dynamicconfig.Collection, + metricsHandler metrics.Handler, ) aggregate.SignalAggregator[quotas.Request] { return aggregate.NewPerShardPerNsHealthSignalAggregator( dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second), dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500), + metricsHandler, ) }