diff --git a/common/aggregate/bench_moving_window_avg_test.go b/common/aggregate/bench_moving_window_avg_test.go new file mode 100644 index 00000000000..3500b789e29 --- /dev/null +++ b/common/aggregate/bench_moving_window_avg_test.go @@ -0,0 +1,49 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package aggregate + +import ( + "math/rand" + "testing" + "time" +) + +// BenchmarkArrayMovingWindowAvg +// BenchmarkArrayMovingWindowAvg-10 17021074 66.27 ns/op + +const ( + testWindowSize = 10 * time.Millisecond + testBufferSize = 200 +) + +func BenchmarkArrayMovingWindowAvg(b *testing.B) { + avg := NewMovingWindowAvgImpl(testWindowSize, testBufferSize) + for i := 0; i < b.N; i++ { + avg.Record(rand.Int63()) + if i%10 == 0 { + avg.Average() + } + } +} diff --git a/common/aggregate/moving_window_average.go b/common/aggregate/moving_window_average.go new file mode 100644 index 00000000000..ad5d93cfc0c --- /dev/null +++ b/common/aggregate/moving_window_average.go @@ -0,0 +1,103 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package aggregate + +import ( + "sync" + "time" +) + +type ( + MovingWindowAverage interface { + Record(val int64) + Average() float64 + } + + timestampedData struct { + value int64 + timestamp time.Time + } + + MovingWindowAvgImpl struct { + sync.Mutex + windowSize time.Duration + maxBufferSize int + buffer []timestampedData + headIdx int + tailIdx int + sum int64 + count int64 + } +) + +func NewMovingWindowAvgImpl( + windowSize time.Duration, + maxBufferSize int, +) *MovingWindowAvgImpl { + return &MovingWindowAvgImpl{ + windowSize: windowSize, + maxBufferSize: maxBufferSize, + buffer: make([]timestampedData, maxBufferSize), + } +} + +func (a *MovingWindowAvgImpl) Record(val int64) { + a.Lock() + defer a.Unlock() + + a.buffer[a.tailIdx] = timestampedData{timestamp: time.Now(), value: val} + a.tailIdx = (a.tailIdx + 1) % a.maxBufferSize + + a.sum += val + a.count++ + + if a.tailIdx == a.headIdx { + // buffer full, expire oldest element + a.sum -= a.buffer[a.headIdx].value + a.count-- + a.headIdx = (a.headIdx + 1) % a.maxBufferSize + } +} + +func (a *MovingWindowAvgImpl) Average() float64 { + a.Lock() + defer a.Unlock() + + a.expireOldValuesLocked() + if a.count == 0 { + return 0 + } + return float64(a.sum) / float64(a.count) +} + +func (a *MovingWindowAvgImpl) expireOldValuesLocked() { + for ; a.headIdx != a.tailIdx; a.headIdx = (a.headIdx + 1) % a.maxBufferSize { + if time.Since(a.buffer[a.headIdx].timestamp) < a.windowSize { + break + } + a.sum -= a.buffer[a.headIdx].value + a.count-- + } +} diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 6a30fe44399..b8277e03a7f 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -106,6 +106,14 @@ const ( EnableEagerWorkflowStart = "system.enableEagerWorkflowStart" // NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval" + // PersistenceHealthSignalCollectionEnabled determines whether persistence health signal collection/aggregation is enabled + PersistenceHealthSignalCollectionEnabled = "system.persistenceHealthSignalCollectionEnabled" + // PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals + PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize" + // PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key + PersistenceHealthSignalBufferSize = "system.persistenceHealthSignalBufferSize" + // ShardRPSWarnLimit is the per-shard RPS limit for warning + ShardRPSWarnLimit = "system.shardRPSWarnLimit" // Whether the deadlock detector should dump goroutines DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines" diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 3fe9ed6fb9b..1ff95ccaaf4 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1648,6 +1648,7 @@ var ( PersistenceFailures = NewCounterDef("persistence_errors") PersistenceErrorWithType = NewCounterDef("persistence_error_with_type") PersistenceLatency = NewTimerDef("persistence_latency") + PersistenceShardRPS = NewDimensionlessHistogramDef("persistence_shard_rps") PersistenceErrShardExistsCounter = NewCounterDef("persistence_errors_shard_exists") PersistenceErrShardOwnershipLostCounter = NewCounterDef("persistence_errors_shard_ownership_lost") PersistenceErrConditionFailedCounter = NewCounterDef("persistence_errors_condition_failed") diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index 47caabbd474..4242fa11528 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -26,7 +26,6 @@ package client import ( "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" @@ -69,6 +68,7 @@ type ( logger log.Logger clusterName string ratelimiter quotas.RequestRateLimiter + healthSignals p.HealthSignalAggregator } ) @@ -87,8 +87,9 @@ func NewFactory( clusterName string, metricsHandler metrics.Handler, logger log.Logger, + healthSignals p.HealthSignalAggregator, ) Factory { - return &factoryImpl{ + factory := &factoryImpl{ dataStoreFactory: dataStoreFactory, config: cfg, serializer: serializer, @@ -96,7 +97,10 @@ func NewFactory( logger: logger, clusterName: clusterName, ratelimiter: ratelimiter, + healthSignals: healthSignals, } + factory.initDependencies() + return factory } // NewTaskManager returns a new task manager @@ -110,8 +114,8 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) { if f.ratelimiter != nil { result = p.NewTaskPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } return result, nil } @@ -127,8 +131,8 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) { if f.ratelimiter != nil { result = p.NewShardPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } result = p.NewShardPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError) return result, nil @@ -145,8 +149,8 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) { if f.ratelimiter != nil { result = p.NewMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } result = p.NewMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError) return result, nil @@ -163,8 +167,8 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err if f.ratelimiter != nil { result = p.NewClusterMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } result = p.NewClusterMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError) return result, nil @@ -181,8 +185,8 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) { if f.ratelimiter != nil { result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } result = p.NewExecutionPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError) return result, nil @@ -197,8 +201,8 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu if f.ratelimiter != nil { result = p.NewQueuePersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } - if f.metricsHandler != nil { - result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.logger) + if f.metricsHandler != nil && f.healthSignals != nil { + result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger) } result = p.NewQueuePersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError) return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsHandler, f.logger) @@ -207,6 +211,9 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu // Close closes this factory func (f *factoryImpl) Close() { f.dataStoreFactory.Close() + if f.healthSignals != nil { + f.healthSignals.Stop() + } } func IsPersistenceTransientError(err error) bool { @@ -217,3 +224,17 @@ func IsPersistenceTransientError(err error) bool { return false } + +func (f *factoryImpl) initDependencies() { + if f.metricsHandler == nil && f.healthSignals == nil { + return + } + + if f.metricsHandler == nil { + f.metricsHandler = metrics.NoopMetricsHandler + } + if f.healthSignals == nil { + f.healthSignals = p.NoopHealthSignalAggregator + } + f.healthSignals.Start() +} diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index f2876f647b4..2a238b936b3 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -25,6 +25,8 @@ package client import ( + "time" + "go.uber.org/fx" "go.temporal.io/server/common/cluster" @@ -32,6 +34,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/quotas" @@ -57,6 +60,7 @@ type ( ServiceName primitives.ServiceName MetricsHandler metrics.Handler Logger log.Logger + HealthSignals persistence.HealthSignalAggregator } FactoryProviderFn func(NewFactoryParams) Factory @@ -66,6 +70,7 @@ var Module = fx.Options( BeanModule, fx.Provide(ClusterNameProvider), fx.Provide(DataStoreFactoryProvider), + fx.Provide(HealthSignalAggregatorProvider), ) func ClusterNameProvider(config *cluster.Config) ClusterName { @@ -97,5 +102,24 @@ func FactoryProvider( string(params.ClusterName), params.MetricsHandler, params.Logger, + params.HealthSignals, ) } + +func HealthSignalAggregatorProvider( + dynamicCollection *dynamicconfig.Collection, + metricsHandler metrics.Handler, + logger log.Logger, +) persistence.HealthSignalAggregator { + if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalCollectionEnabled, true)() { + return persistence.NewHealthSignalAggregatorImpl( + dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second)(), + dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500)(), + metricsHandler, + dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50), + logger, + ) + } + + return persistence.NoopHealthSignalAggregator +} diff --git a/common/persistence/health_signal_aggregator.go b/common/persistence/health_signal_aggregator.go new file mode 100644 index 00000000000..b9ce009b56c --- /dev/null +++ b/common/persistence/health_signal_aggregator.go @@ -0,0 +1,170 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "sync" + "sync/atomic" + "time" + + "go.temporal.io/server/common" + "go.temporal.io/server/common/aggregate" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" +) + +const ( + emitMetricsInterval = 30 * time.Second +) + +type ( + HealthSignalAggregator interface { + common.Daemon + Record(callerSegment int32, latency time.Duration, err error) + AverageLatency() float64 + ErrorRatio() float64 + } + + HealthSignalAggregatorImpl struct { + status int32 + shutdownCh chan struct{} + + requestsPerShard map[int32]int64 + requestsLock sync.Mutex + + latencyAverage aggregate.MovingWindowAverage + errorRatio aggregate.MovingWindowAverage + + metricsHandler metrics.Handler + emitMetricsTimer *time.Ticker + perShardRPSWarnLimit dynamicconfig.IntPropertyFn + + logger log.Logger + } +) + +func NewHealthSignalAggregatorImpl( + windowSize time.Duration, + maxBufferSize int, + metricsHandler metrics.Handler, + perShardRPSWarnLimit dynamicconfig.IntPropertyFn, + logger log.Logger, +) *HealthSignalAggregatorImpl { + return &HealthSignalAggregatorImpl{ + status: common.DaemonStatusInitialized, + shutdownCh: make(chan struct{}), + requestsPerShard: make(map[int32]int64), + latencyAverage: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize), + errorRatio: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize), + metricsHandler: metricsHandler, + emitMetricsTimer: time.NewTicker(emitMetricsInterval), + perShardRPSWarnLimit: perShardRPSWarnLimit, + logger: logger, + } +} + +func (s *HealthSignalAggregatorImpl) Start() { + if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + go s.emitMetricsLoop() +} + +func (s *HealthSignalAggregatorImpl) Stop() { + if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return + } + close(s.shutdownCh) + s.emitMetricsTimer.Stop() +} + +func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Duration, err error) { + // TODO: uncomment when adding dynamic rate limiter + //s.latencyAverage.Record(latency.Milliseconds()) + // + //if isUnhealthyError(err) { + // s.errorRatio.Record(1) + //} else { + // s.errorRatio.Record(0) + //} + + if callerSegment != CallerSegmentMissing { + s.incrementShardRequestCount(callerSegment) + } +} + +func (s *HealthSignalAggregatorImpl) AverageLatency() float64 { + return s.latencyAverage.Average() +} + +func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 { + return s.errorRatio.Average() +} + +func (s *HealthSignalAggregatorImpl) incrementShardRequestCount(shardID int32) { + s.requestsLock.Lock() + defer s.requestsLock.Unlock() + s.requestsPerShard[shardID]++ +} + +func (s *HealthSignalAggregatorImpl) emitMetricsLoop() { + for { + select { + case <-s.shutdownCh: + return + case <-s.emitMetricsTimer.C: + s.requestsLock.Lock() + requestCounts := s.requestsPerShard + s.requestsPerShard = make(map[int32]int64, len(requestCounts)) + s.requestsLock.Unlock() + + for shardID, count := range requestCounts { + shardRPS := int64(float64(count) / emitMetricsInterval.Seconds()) + s.metricsHandler.Histogram(metrics.PersistenceShardRPS.GetMetricName(), metrics.PersistenceShardRPS.GetMetricUnit()).Record(shardRPS) + if shardRPS > int64(s.perShardRPSWarnLimit()) { + s.logger.Warn("Per shard RPS warn limit exceeded", tag.ShardID(shardID)) + } + } + } + } +} + +// TODO: uncomment when adding dynamic rate limiter +//func isUnhealthyError(err error) bool { +// if err == nil { +// return false +// } +// switch err.(type) { +// case *ShardOwnershipLostError, +// *AppendHistoryTimeoutError, +// *TimeoutError: +// return true +// +// default: +// return false +// } +//} diff --git a/common/persistence/noop_health_signal_aggregator.go b/common/persistence/noop_health_signal_aggregator.go new file mode 100644 index 00000000000..dff0c151d28 --- /dev/null +++ b/common/persistence/noop_health_signal_aggregator.go @@ -0,0 +1,51 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "time" +) + +var NoopHealthSignalAggregator HealthSignalAggregator = newNoopSignalAggregator() + +type ( + noopSignalAggregator struct{} +) + +func newNoopSignalAggregator() *noopSignalAggregator { return &noopSignalAggregator{} } + +func (a *noopSignalAggregator) Start() {} + +func (a *noopSignalAggregator) Stop() {} + +func (a *noopSignalAggregator) Record(_ int32, _ time.Duration, _ error) {} + +func (a *noopSignalAggregator) AverageLatency() float64 { + return 0 +} + +func (*noopSignalAggregator) ErrorRatio() float64 { + return 0 +} diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index a1b3bc4b922..02a88204a09 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -33,7 +33,6 @@ import ( "time" "github.com/stretchr/testify/suite" - persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/common" @@ -98,6 +97,7 @@ type ( TaskIDGenerator TransferTaskIDGenerator ClusterMetadata cluster.Metadata SearchAttributesManager searchattribute.Manager + PersistenceHealthSignals persistence.HealthSignalAggregator ReadLevel int64 ReplicationReadLevel int64 DefaultTestCluster PersistenceTestCluster @@ -202,7 +202,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { s.Logger, metrics.NoopMetricsHandler, ) - factory := client.NewFactory(dataStoreFactory, &cfg, nil, serialization.NewSerializer(), clusterName, metrics.NoopMetricsHandler, s.Logger) + factory := client.NewFactory(dataStoreFactory, &cfg, nil, serialization.NewSerializer(), clusterName, metrics.NoopMetricsHandler, s.Logger, persistence.NoopHealthSignalAggregator) s.TaskMgr, err = factory.NewTaskManager() s.fatalOnError("NewTaskManager", err) diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 703831d7713..5358ffcbd8b 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -31,7 +31,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -47,32 +46,38 @@ type ( shardPersistenceClient struct { metricEmitter - persistence ShardManager + healthSignals HealthSignalAggregator + persistence ShardManager } executionPersistenceClient struct { metricEmitter - persistence ExecutionManager + healthSignals HealthSignalAggregator + persistence ExecutionManager } taskPersistenceClient struct { metricEmitter - persistence TaskManager + healthSignals HealthSignalAggregator + persistence TaskManager } metadataPersistenceClient struct { metricEmitter - persistence MetadataManager + healthSignals HealthSignalAggregator + persistence MetadataManager } clusterMetadataPersistenceClient struct { metricEmitter - persistence ClusterMetadataManager + healthSignals HealthSignalAggregator + persistence ClusterMetadataManager } queuePersistenceClient struct { metricEmitter - persistence Queue + healthSignals HealthSignalAggregator + persistence Queue } ) @@ -84,68 +89,74 @@ var _ ClusterMetadataManager = (*clusterMetadataPersistenceClient)(nil) var _ Queue = (*queuePersistenceClient)(nil) // NewShardPersistenceMetricsClient creates a client to manage shards -func NewShardPersistenceMetricsClient(persistence ShardManager, metricsHandler metrics.Handler, logger log.Logger) ShardManager { +func NewShardPersistenceMetricsClient(persistence ShardManager, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) ShardManager { return &shardPersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } // NewExecutionPersistenceMetricsClient creates a client to manage executions -func NewExecutionPersistenceMetricsClient(persistence ExecutionManager, metricsHandler metrics.Handler, logger log.Logger) ExecutionManager { +func NewExecutionPersistenceMetricsClient(persistence ExecutionManager, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) ExecutionManager { return &executionPersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } // NewTaskPersistenceMetricsClient creates a client to manage tasks -func NewTaskPersistenceMetricsClient(persistence TaskManager, metricsHandler metrics.Handler, logger log.Logger) TaskManager { +func NewTaskPersistenceMetricsClient(persistence TaskManager, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) TaskManager { return &taskPersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } // NewMetadataPersistenceMetricsClient creates a MetadataManager client to manage metadata -func NewMetadataPersistenceMetricsClient(persistence MetadataManager, metricsHandler metrics.Handler, logger log.Logger) MetadataManager { +func NewMetadataPersistenceMetricsClient(persistence MetadataManager, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) MetadataManager { return &metadataPersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } // NewClusterMetadataPersistenceMetricsClient creates a ClusterMetadataManager client to manage cluster metadata -func NewClusterMetadataPersistenceMetricsClient(persistence ClusterMetadataManager, metricsHandler metrics.Handler, logger log.Logger) ClusterMetadataManager { +func NewClusterMetadataPersistenceMetricsClient(persistence ClusterMetadataManager, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) ClusterMetadataManager { return &clusterMetadataPersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } // NewQueuePersistenceMetricsClient creates a client to manage queue -func NewQueuePersistenceMetricsClient(persistence Queue, metricsHandler metrics.Handler, logger log.Logger) Queue { +func NewQueuePersistenceMetricsClient(persistence Queue, metricsHandler metrics.Handler, healthSignals HealthSignalAggregator, logger log.Logger) Queue { return &queuePersistenceClient{ metricEmitter: metricEmitter{ metricsHandler: metricsHandler, logger: logger, }, - persistence: persistence, + healthSignals: healthSignals, + persistence: persistence, } } @@ -160,7 +171,9 @@ func (p *shardPersistenceClient) GetOrCreateShard( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetOrCreateShardScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetOrCreateShardScope, caller, latency, retErr) }() return p.persistence.GetOrCreateShard(ctx, request) } @@ -172,7 +185,9 @@ func (p *shardPersistenceClient) UpdateShard( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateShardScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardInfo.GetShardId(), latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateShardScope, caller, latency, retErr) }() return p.persistence.UpdateShard(ctx, request) } @@ -184,7 +199,9 @@ func (p *shardPersistenceClient) AssertShardOwnership( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceAssertShardOwnershipScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceAssertShardOwnershipScope, caller, latency, retErr) }() return p.persistence.AssertShardOwnership(ctx, request) } @@ -208,7 +225,9 @@ func (p *executionPersistenceClient) CreateWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCreateWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCreateWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.CreateWorkflowExecution(ctx, request) } @@ -220,7 +239,9 @@ func (p *executionPersistenceClient) GetWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.GetWorkflowExecution(ctx, request) } @@ -232,7 +253,9 @@ func (p *executionPersistenceClient) SetWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceSetWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceSetWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.SetWorkflowExecution(ctx, request) } @@ -244,7 +267,9 @@ func (p *executionPersistenceClient) UpdateWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.UpdateWorkflowExecution(ctx, request) } @@ -256,7 +281,9 @@ func (p *executionPersistenceClient) ConflictResolveWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceConflictResolveWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceConflictResolveWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.ConflictResolveWorkflowExecution(ctx, request) } @@ -268,7 +295,9 @@ func (p *executionPersistenceClient) DeleteWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.DeleteWorkflowExecution(ctx, request) } @@ -280,7 +309,9 @@ func (p *executionPersistenceClient) DeleteCurrentWorkflowExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, caller, latency, retErr) }() return p.persistence.DeleteCurrentWorkflowExecution(ctx, request) } @@ -292,7 +323,9 @@ func (p *executionPersistenceClient) GetCurrentExecution( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetCurrentExecutionScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetCurrentExecutionScope, caller, latency, retErr) }() return p.persistence.GetCurrentExecution(ctx, request) } @@ -304,7 +337,9 @@ func (p *executionPersistenceClient) ListConcreteExecutions( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceListConcreteExecutionsScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceListConcreteExecutionsScope, caller, latency, retErr) }() return p.persistence.ListConcreteExecutions(ctx, request) } @@ -343,7 +378,9 @@ func (p *executionPersistenceClient) AddHistoryTasks( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceAddTasksScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceAddTasksScope, caller, latency, retErr) }() return p.persistence.AddHistoryTasks(ctx, request) } @@ -371,7 +408,9 @@ func (p *executionPersistenceClient) GetHistoryTasks( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(operation, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.GetHistoryTasks(ctx, request) } @@ -399,7 +438,9 @@ func (p *executionPersistenceClient) CompleteHistoryTask( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(operation, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.CompleteHistoryTask(ctx, request) } @@ -427,7 +468,9 @@ func (p *executionPersistenceClient) RangeCompleteHistoryTasks( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(operation, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(operation, caller, latency, retErr) }() return p.persistence.RangeCompleteHistoryTasks(ctx, request) } @@ -439,7 +482,9 @@ func (p *executionPersistenceClient) PutReplicationTaskToDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistencePutReplicationTaskToDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistencePutReplicationTaskToDLQScope, caller, latency, retErr) }() return p.persistence.PutReplicationTaskToDLQ(ctx, request) } @@ -451,7 +496,9 @@ func (p *executionPersistenceClient) GetReplicationTasksFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, latency, retErr) }() return p.persistence.GetReplicationTasksFromDLQ(ctx, request) } @@ -463,7 +510,9 @@ func (p *executionPersistenceClient) DeleteReplicationTaskFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteReplicationTaskFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteReplicationTaskFromDLQScope, caller, latency, retErr) }() return p.persistence.DeleteReplicationTaskFromDLQ(ctx, request) } @@ -475,7 +524,9 @@ func (p *executionPersistenceClient) RangeDeleteReplicationTaskFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceRangeDeleteReplicationTaskFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceRangeDeleteReplicationTaskFromDLQScope, caller, latency, retErr) }() return p.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request) } @@ -487,7 +538,9 @@ func (p *executionPersistenceClient) IsReplicationDLQEmpty( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(request.ShardID, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetReplicationTasksFromDLQScope, caller, latency, retErr) }() return p.persistence.IsReplicationDLQEmpty(ctx, request) } @@ -507,7 +560,9 @@ func (p *taskPersistenceClient) CreateTasks( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCreateTasksScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCreateTasksScope, caller, latency, retErr) }() return p.persistence.CreateTasks(ctx, request) } @@ -519,7 +574,9 @@ func (p *taskPersistenceClient) GetTasks( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetTasksScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetTasksScope, caller, latency, retErr) }() return p.persistence.GetTasks(ctx, request) } @@ -531,7 +588,9 @@ func (p *taskPersistenceClient) CompleteTask( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCompleteTaskScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCompleteTaskScope, caller, latency, retErr) }() return p.persistence.CompleteTask(ctx, request) } @@ -543,7 +602,9 @@ func (p *taskPersistenceClient) CompleteTasksLessThan( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCompleteTasksLessThanScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCompleteTasksLessThanScope, caller, latency, retErr) }() return p.persistence.CompleteTasksLessThan(ctx, request) } @@ -555,7 +616,9 @@ func (p *taskPersistenceClient) CreateTaskQueue( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCreateTaskQueueScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCreateTaskQueueScope, caller, latency, retErr) }() return p.persistence.CreateTaskQueue(ctx, request) } @@ -567,7 +630,9 @@ func (p *taskPersistenceClient) UpdateTaskQueue( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateTaskQueueScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateTaskQueueScope, caller, latency, retErr) }() return p.persistence.UpdateTaskQueue(ctx, request) } @@ -579,7 +644,9 @@ func (p *taskPersistenceClient) GetTaskQueue( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetTaskQueueScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetTaskQueueScope, caller, latency, retErr) }() return p.persistence.GetTaskQueue(ctx, request) } @@ -591,7 +658,9 @@ func (p *taskPersistenceClient) ListTaskQueue( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceListTaskQueueScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceListTaskQueueScope, caller, latency, retErr) }() return p.persistence.ListTaskQueue(ctx, request) } @@ -603,7 +672,9 @@ func (p *taskPersistenceClient) DeleteTaskQueue( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteTaskQueueScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteTaskQueueScope, caller, latency, retErr) }() return p.persistence.DeleteTaskQueue(ctx, request) } @@ -623,7 +694,9 @@ func (p *metadataPersistenceClient) CreateNamespace( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceCreateNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceCreateNamespaceScope, caller, latency, retErr) }() return p.persistence.CreateNamespace(ctx, request) } @@ -635,7 +708,9 @@ func (p *metadataPersistenceClient) GetNamespace( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetNamespaceScope, caller, latency, retErr) }() return p.persistence.GetNamespace(ctx, request) } @@ -647,7 +722,9 @@ func (p *metadataPersistenceClient) UpdateNamespace( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateNamespaceScope, caller, latency, retErr) }() return p.persistence.UpdateNamespace(ctx, request) } @@ -659,7 +736,9 @@ func (p *metadataPersistenceClient) RenameNamespace( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceRenameNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceRenameNamespaceScope, caller, latency, retErr) }() return p.persistence.RenameNamespace(ctx, request) } @@ -671,7 +750,9 @@ func (p *metadataPersistenceClient) DeleteNamespace( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceScope, caller, latency, retErr) }() return p.persistence.DeleteNamespace(ctx, request) } @@ -683,7 +764,9 @@ func (p *metadataPersistenceClient) DeleteNamespaceByName( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceByNameScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteNamespaceByNameScope, caller, latency, retErr) }() return p.persistence.DeleteNamespaceByName(ctx, request) } @@ -695,7 +778,9 @@ func (p *metadataPersistenceClient) ListNamespaces( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceListNamespacesScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceListNamespacesScope, caller, latency, retErr) }() return p.persistence.ListNamespaces(ctx, request) } @@ -706,7 +791,9 @@ func (p *metadataPersistenceClient) GetMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetMetadataScope, caller, latency, retErr) }() return p.persistence.GetMetadata(ctx) } @@ -723,7 +810,9 @@ func (p *executionPersistenceClient) AppendHistoryNodes( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceAppendHistoryNodesScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceAppendHistoryNodesScope, caller, latency, retErr) }() return p.persistence.AppendHistoryNodes(ctx, request) } @@ -736,7 +825,9 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceAppendRawHistoryNodesScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceAppendRawHistoryNodesScope, caller, latency, retErr) }() return p.persistence.AppendRawHistoryNodes(ctx, request) } @@ -749,7 +840,9 @@ func (p *executionPersistenceClient) ReadHistoryBranch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranch(ctx, request) } @@ -761,7 +854,9 @@ func (p *executionPersistenceClient) ReadHistoryBranchReverse( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchReverseScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchReverseScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranchReverse(ctx, request) } @@ -774,7 +869,9 @@ func (p *executionPersistenceClient) ReadHistoryBranchByBatch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadHistoryBranchByBatch(ctx, request) } @@ -787,7 +884,9 @@ func (p *executionPersistenceClient) ReadRawHistoryBranch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadRawHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadRawHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ReadRawHistoryBranch(ctx, request) } @@ -800,7 +899,9 @@ func (p *executionPersistenceClient) ForkHistoryBranch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceForkHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceForkHistoryBranchScope, caller, latency, retErr) }() return p.persistence.ForkHistoryBranch(ctx, request) } @@ -813,7 +914,9 @@ func (p *executionPersistenceClient) DeleteHistoryBranch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteHistoryBranchScope, caller, latency, retErr) }() return p.persistence.DeleteHistoryBranch(ctx, request) } @@ -826,7 +929,9 @@ func (p *executionPersistenceClient) TrimHistoryBranch( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceTrimHistoryBranchScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceTrimHistoryBranchScope, caller, latency, retErr) }() return p.persistence.TrimHistoryBranch(ctx, request) } @@ -838,7 +943,9 @@ func (p *executionPersistenceClient) GetAllHistoryTreeBranches( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetAllHistoryTreeBranchesScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetAllHistoryTreeBranchesScope, caller, latency, retErr) }() return p.persistence.GetAllHistoryTreeBranches(ctx, request) } @@ -851,7 +958,9 @@ func (p *executionPersistenceClient) GetHistoryTree( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetHistoryTreeScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetHistoryTreeScope, caller, latency, retErr) }() return p.persistence.GetHistoryTree(ctx, request) } @@ -870,7 +979,9 @@ func (p *queuePersistenceClient) EnqueueMessage( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceEnqueueMessageScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceEnqueueMessageScope, caller, latency, retErr) }() return p.persistence.EnqueueMessage(ctx, blob) } @@ -883,7 +994,9 @@ func (p *queuePersistenceClient) ReadMessages( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadQueueMessagesScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadQueueMessagesScope, caller, latency, retErr) }() return p.persistence.ReadMessages(ctx, lastMessageID, maxCount) } @@ -895,7 +1008,9 @@ func (p *queuePersistenceClient) UpdateAckLevel( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateAckLevelScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateAckLevelScope, caller, latency, retErr) }() return p.persistence.UpdateAckLevel(ctx, metadata) } @@ -906,7 +1021,9 @@ func (p *queuePersistenceClient) GetAckLevels( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetAckLevelScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetAckLevelScope, caller, latency, retErr) }() return p.persistence.GetAckLevels(ctx) } @@ -918,7 +1035,9 @@ func (p *queuePersistenceClient) DeleteMessagesBefore( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteMessagesBeforeScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteMessagesBeforeScope, caller, latency, retErr) }() return p.persistence.DeleteMessagesBefore(ctx, messageID) } @@ -930,7 +1049,9 @@ func (p *queuePersistenceClient) EnqueueMessageToDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceEnqueueMessageToDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceEnqueueMessageToDLQScope, caller, latency, retErr) }() return p.persistence.EnqueueMessageToDLQ(ctx, blob) } @@ -945,7 +1066,9 @@ func (p *queuePersistenceClient) ReadMessagesFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceReadMessagesFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceReadMessagesFromDLQScope, caller, latency, retErr) }() return p.persistence.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken) } @@ -957,7 +1080,9 @@ func (p *queuePersistenceClient) DeleteMessageFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteMessageFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteMessageFromDLQScope, caller, latency, retErr) }() return p.persistence.DeleteMessageFromDLQ(ctx, messageID) } @@ -970,7 +1095,9 @@ func (p *queuePersistenceClient) RangeDeleteMessagesFromDLQ( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceRangeDeleteMessagesFromDLQScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceRangeDeleteMessagesFromDLQScope, caller, latency, retErr) }() return p.persistence.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID) } @@ -982,7 +1109,9 @@ func (p *queuePersistenceClient) UpdateDLQAckLevel( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateDLQAckLevelScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpdateDLQAckLevelScope, caller, latency, retErr) }() return p.persistence.UpdateDLQAckLevel(ctx, metadata) } @@ -993,7 +1122,9 @@ func (p *queuePersistenceClient) GetDLQAckLevels( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetDLQAckLevelScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetDLQAckLevelScope, caller, latency, retErr) }() return p.persistence.GetDLQAckLevels(ctx) } @@ -1013,7 +1144,9 @@ func (p *clusterMetadataPersistenceClient) ListClusterMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceListClusterMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceListClusterMetadataScope, caller, latency, retErr) }() return p.persistence.ListClusterMetadata(ctx, request) } @@ -1024,7 +1157,9 @@ func (p *clusterMetadataPersistenceClient) GetCurrentClusterMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetCurrentClusterMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetCurrentClusterMetadataScope, caller, latency, retErr) }() return p.persistence.GetCurrentClusterMetadata(ctx) } @@ -1036,7 +1171,9 @@ func (p *clusterMetadataPersistenceClient) GetClusterMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetClusterMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetClusterMetadataScope, caller, latency, retErr) }() return p.persistence.GetClusterMetadata(ctx, request) } @@ -1048,7 +1185,9 @@ func (p *clusterMetadataPersistenceClient) SaveClusterMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceSaveClusterMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceSaveClusterMetadataScope, caller, latency, retErr) }() return p.persistence.SaveClusterMetadata(ctx, request) } @@ -1060,7 +1199,9 @@ func (p *clusterMetadataPersistenceClient) DeleteClusterMetadata( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceDeleteClusterMetadataScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceDeleteClusterMetadataScope, caller, latency, retErr) }() return p.persistence.DeleteClusterMetadata(ctx, request) } @@ -1076,7 +1217,9 @@ func (p *clusterMetadataPersistenceClient) GetClusterMembers( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceGetClusterMembersScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceGetClusterMembersScope, caller, latency, retErr) }() return p.persistence.GetClusterMembers(ctx, request) } @@ -1088,7 +1231,9 @@ func (p *clusterMetadataPersistenceClient) UpsertClusterMembership( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceUpsertClusterMembershipScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceUpsertClusterMembershipScope, caller, latency, retErr) }() return p.persistence.UpsertClusterMembership(ctx, request) } @@ -1100,7 +1245,9 @@ func (p *clusterMetadataPersistenceClient) PruneClusterMembership( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistencePruneClusterMembershipScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistencePruneClusterMembershipScope, caller, latency, retErr) }() return p.persistence.PruneClusterMembership(ctx, request) } @@ -1112,15 +1259,17 @@ func (p *metadataPersistenceClient) InitializeSystemNamespaces( caller := headers.GetCallerInfo(ctx).CallerName startTime := time.Now().UTC() defer func() { - p.recordRequestMetrics(metrics.PersistenceInitializeSystemNamespaceScope, caller, startTime, retErr) + latency := time.Since(startTime) + p.healthSignals.Record(CallerSegmentMissing, latency, retErr) + p.recordRequestMetrics(metrics.PersistenceInitializeSystemNamespaceScope, caller, latency, retErr) }() return p.persistence.InitializeSystemNamespaces(ctx, currentClusterName) } -func (p *metricEmitter) recordRequestMetrics(operation string, caller string, startTime time.Time, err error) { +func (p *metricEmitter) recordRequestMetrics(operation string, caller string, latency time.Duration, err error) { handler := p.metricsHandler.WithTags(metrics.OperationTag(operation), metrics.NamespaceTag(caller)) handler.Counter(metrics.PersistenceRequests.GetMetricName()).Record(1) - handler.Timer(metrics.PersistenceLatency.GetMetricName()).Record(time.Since(startTime)) + handler.Timer(metrics.PersistenceLatency.GetMetricName()).Record(latency) updateErrorMetric(handler, p.logger, operation, err) } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 2e63b5cc6f8..d79b59cc0f9 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -122,12 +122,12 @@ func (s *contextSuite) TestOverwriteScheduledTaskTimestamp() { tasks.CategoryTimer, time.Time{}, ) - tasks := map[tasks.Category][]tasks.Task{ + testTasks := map[tasks.Category][]tasks.Task{ tasks.CategoryTimer: {fakeTask}, } s.mockExecutionManager.EXPECT().AddHistoryTasks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - s.mockHistoryEngine.EXPECT().NotifyNewTasks(tasks).AnyTimes() + s.mockHistoryEngine.EXPECT().NotifyNewTasks(testTasks).AnyTimes() testCases := []struct { taskTimestamp time.Time @@ -162,7 +162,7 @@ func (s *contextSuite) TestOverwriteScheduledTaskTimestamp() { NamespaceID: workflowKey.NamespaceID, WorkflowID: workflowKey.WorkflowID, RunID: workflowKey.RunID, - Tasks: tasks, + Tasks: testTasks, }, ) s.NoError(err)