Skip to content

Commit

Permalink
add metric emission
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner committed May 22, 2023
1 parent 9543764 commit 859950e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
48 changes: 40 additions & 8 deletions common/aggregate/persistence_health_signal_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"time"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/quotas"
)

type (
PersistenceHealthSignalAggregator[K comparable] struct {
PersistenceHealthSignalAggregator[K SignalKey] struct {
SignalAggregator[quotas.Request]

keyMapper SignalKeyMapperFn[quotas.Request, K]

latencyAverages map[K]MovingWindowAverage
Expand All @@ -45,6 +47,9 @@ type (

windowSize time.Duration
maxBufferSize int

metricsHandler metrics.Handler
emitMetricsTimer *time.Ticker
}

perShardPerNsHealthSignalKey struct {
Expand All @@ -53,28 +58,35 @@ type (
}
)

func NewPersistenceHealthSignalAggregator[K comparable](
func NewPersistenceHealthSignalAggregator[K SignalKey](
keyMapper SignalKeyMapperFn[quotas.Request, K],
windowSize time.Duration,
maxBufferSize int,
metricsHandler metrics.Handler,
) *PersistenceHealthSignalAggregator[K] {
return &PersistenceHealthSignalAggregator[K]{
keyMapper: keyMapper,
latencyAverages: make(map[K]MovingWindowAverage),
errorRatios: make(map[K]MovingWindowAverage),
windowSize: windowSize,
maxBufferSize: maxBufferSize,
ret := &PersistenceHealthSignalAggregator[K]{
keyMapper: keyMapper,
latencyAverages: make(map[K]MovingWindowAverage),
errorRatios: make(map[K]MovingWindowAverage),
windowSize: windowSize,
maxBufferSize: maxBufferSize,
metricsHandler: metricsHandler,
emitMetricsTimer: time.NewTicker(windowSize),
}
go ret.emitMetricsLoop()
return ret
}

func NewPerShardPerNsHealthSignalAggregator(
windowSize dynamicconfig.DurationPropertyFn,
maxBufferSize dynamicconfig.IntPropertyFn,
metricsHandler metrics.Handler,
) *PersistenceHealthSignalAggregator[perShardPerNsHealthSignalKey] {
return NewPersistenceHealthSignalAggregator[perShardPerNsHealthSignalKey](
perShardPerNsKeyMapperFn,
windowSize(),
maxBufferSize(),
metricsHandler,
)
}

Expand All @@ -85,6 +97,12 @@ func perShardPerNsKeyMapperFn(req quotas.Request) perShardPerNsHealthSignalKey {
}
}

func (k perShardPerNsHealthSignalKey) GetMetricTags() []metrics.Tag {
nsTag := metrics.NamespaceTag(k.namespace)
shardTag := metrics.ShardTag(k.shardID)
return []metrics.Tag{nsTag, shardTag}
}

func (s *PersistenceHealthSignalAggregator[_]) GetRecordFn(req quotas.Request) func(err error) {
start := time.Now()
return func(err error) {
Expand Down Expand Up @@ -141,3 +159,17 @@ func (s *PersistenceHealthSignalAggregator[K]) getOrInitAverage(
(*averages)[key] = newAvg
return newAvg
}

func (s *PersistenceHealthSignalAggregator[_]) emitMetricsLoop() {
for {
select {
case <-s.emitMetricsTimer.C:
for key, avg := range s.latencyAverages {
s.metricsHandler.Gauge(metrics.PersistenceAvgLatencyPerShardPerNamespace.GetMetricName()).Record(avg.Average(), key.GetMetricTags()...)
}
for key, ratio := range s.errorRatios {
s.metricsHandler.Gauge(metrics.PersistenceErrPerShardPerNamespace.GetMetricName()).Record(ratio.Average(), key.GetMetricTags()...)
}
}
}
}
15 changes: 11 additions & 4 deletions common/aggregate/signal_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@

package aggregate

import "go.temporal.io/server/common/metrics"

type (
SignalKeyMapperFn[T any, K comparable] func(signalOrigin T) K
SignalKey interface {
comparable
GetMetricTags() []metrics.Tag
}

SignalKeyMapperFn[T any, K SignalKey] func(origin T) K

SignalAggregator[T any] interface {
GetRecordFn(key T) func(err error)
AverageLatency(key T) float64
ErrorRatio(key T) float64
GetRecordFn(origin T) func(err error)
AverageLatency(origin T) float64
ErrorRatio(origin T) float64
}
)
2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,4 +1663,6 @@ var (
VisibilityPersistenceFailures = NewCounterDef("visibility_persistence_errors")
VisibilityPersistenceResourceExhausted = NewCounterDef("visibility_persistence_resource_exhausted")
VisibilityPersistenceLatency = NewTimerDef("visibility_persistence_latency")
PersistenceErrPerShardPerNamespace = NewDimensionlessHistogramDef("persistence_error_ratio_per_shard_per_ns")
PersistenceAvgLatencyPerShardPerNamespace = NewDimensionlessHistogramDef("persistence_average_latency_per_shard_per_namespace")
)
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
instance = "instance"
namespace = "namespace"
namespaceState = "namespace_state"
shard = "shard"
targetCluster = "target_cluster"
fromCluster = "from_cluster"
toCluster = "to_cluster"
Expand Down Expand Up @@ -131,6 +132,10 @@ func InstanceTag(value string) Tag {
return &tagImpl{key: instance, value: value}
}

func ShardTag(value int32) Tag {
return &tagImpl{key: shard, value: string(value)}
}

// TargetClusterTag returns a new target cluster tag.
func TargetClusterTag(value string) Tag {
if len(value) == 0 {
Expand Down
2 changes: 2 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ func NamespaceRegistryProvider(

func PersistenceHealthSignalAggregatorProvider(
dynamicCollection *dynamicconfig.Collection,
metricsHandler metrics.Handler,
) aggregate.SignalAggregator[quotas.Request] {
return aggregate.NewPerShardPerNsHealthSignalAggregator(
dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second),
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500),
metricsHandler,
)
}

Expand Down

0 comments on commit 859950e

Please sign in to comment.