Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot shard metrics #4365

Merged
merged 42 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0045fb7
moving window average
pdoerner May 18, 2023
ee44c22
remove channel avg impl
pdoerner May 18, 2023
68766b7
add signal aggregator
pdoerner May 18, 2023
bfc7002
adjust record fn
pdoerner May 18, 2023
82abfa4
add health signal clients
pdoerner May 19, 2023
f38f142
inject signal aggregator
pdoerner May 19, 2023
c037475
fix tests
pdoerner May 19, 2023
9543764
Merge branch 'master' into rate-limiter-metrics
pdoerner May 22, 2023
859950e
add metric emission
pdoerner May 22, 2023
e283bfb
race condition
pdoerner May 23, 2023
c43f1a1
Revert "race condition"
pdoerner May 24, 2023
dc5eb2a
Revert "add metric emission"
pdoerner May 24, 2023
217cb96
emit per shard RPS metric
pdoerner May 24, 2023
bf205a4
cleanup
pdoerner May 24, 2023
b8293dc
merge metric and signal clients
pdoerner May 24, 2023
27ec4c5
cleanup
pdoerner May 24, 2023
df989eb
cleanup
pdoerner May 24, 2023
6aa2e7f
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
f960396
linting
pdoerner May 24, 2023
3c409ce
remove generics
pdoerner May 24, 2023
fe2955d
cleanup
pdoerner May 24, 2023
f412f60
fix deferred metric fn
pdoerner May 24, 2023
3caed40
fix defer metric fn
pdoerner May 24, 2023
d64ff92
fix clients
pdoerner May 24, 2023
e91a0ee
types
pdoerner May 24, 2023
db4db37
acquire lock once
pdoerner May 24, 2023
c256fb1
locks
pdoerner May 24, 2023
98f2b66
array moving average
pdoerner May 24, 2023
f3dfa3d
Revert "array moving average"
pdoerner May 24, 2023
fc62af7
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
33448c8
cleanup
pdoerner May 24, 2023
5a94697
emit per shard RPS
pdoerner May 24, 2023
e179473
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
e0ce388
array average
pdoerner May 25, 2023
e48fcec
feedback
pdoerner May 25, 2023
d3b20f1
cleanup
pdoerner May 25, 2023
c42a921
Merge branch 'master' into rate-limiter-metrics
pdoerner May 25, 2023
b6fd862
handle nil health signals
pdoerner May 25, 2023
0bd90b0
feedback
pdoerner May 25, 2023
0911a11
allow start and stop when not enabled
pdoerner May 25, 2023
df5f722
some improvements
yycptt May 25, 2023
937c0a9
Merge branch 'master' into rate-limiter-metrics
yycptt May 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions common/aggregate/bench_moving_window_avg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

import (
"math/rand"
"testing"
"time"
)

// BenchmarkArrayMovingWindowAvg
// BenchmarkArrayMovingWindowAvg-10 17021074 66.27 ns/op

const (
testWindowSize = 10 * time.Millisecond
testBufferSize = 200
)

func BenchmarkArrayMovingWindowAvg(b *testing.B) {
avg := NewMovingWindowAvgImpl(testWindowSize, testBufferSize)
for i := 0; i < b.N; i++ {
avg.Record(rand.Int63())
if i%10 == 0 {
avg.Average()
}
}
}
103 changes: 103 additions & 0 deletions common/aggregate/moving_window_average.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

import (
"sync"
"time"
)

type (
MovingWindowAverage interface {
Record(val int64)
Average() float64
}

timestampedData struct {
value int64
timestamp time.Time
}

MovingWindowAvgImpl struct {
sync.Mutex
windowSize time.Duration
maxBufferSize int
buffer []timestampedData
headIdx int
tailIdx int
sum int64
count int64
}
)

func NewMovingWindowAvgImpl(
windowSize time.Duration,
maxBufferSize int,
) *MovingWindowAvgImpl {
return &MovingWindowAvgImpl{
windowSize: windowSize,
maxBufferSize: maxBufferSize,
buffer: make([]timestampedData, maxBufferSize),
}
}

func (a *MovingWindowAvgImpl) Record(val int64) {
a.Lock()
defer a.Unlock()

a.buffer[a.tailIdx] = timestampedData{timestamp: time.Now(), value: val}
a.tailIdx = (a.tailIdx + 1) % a.maxBufferSize

a.sum += val
a.count++

if a.tailIdx == a.headIdx {
// buffer full, expire oldest element
a.sum -= a.buffer[a.headIdx].value
a.count--
a.headIdx = (a.headIdx + 1) % a.maxBufferSize
}
}

func (a *MovingWindowAvgImpl) Average() float64 {
a.Lock()
defer a.Unlock()

a.expireOldValuesLocked()
if a.count == 0 {
return 0
}
return float64(a.sum) / float64(a.count)
}

func (a *MovingWindowAvgImpl) expireOldValuesLocked() {
for ; a.headIdx != a.tailIdx; a.headIdx = (a.headIdx + 1) % a.maxBufferSize {
if time.Since(a.buffer[a.headIdx].timestamp) < a.windowSize {
break
}
a.sum -= a.buffer[a.headIdx].value
a.count--
}
}
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ const (
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"
// PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals
PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize"
// PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key
PersistenceHealthSignalBufferSize = "system.persistenceHealthSignalBufferSize"
// ShardRPSWarnLimit is the per-shard RPS limit for warning
ShardRPSWarnLimit = "system.shardRPSWarnLimit"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,7 @@ var (
PersistenceFailures = NewCounterDef("persistence_errors")
PersistenceErrorWithType = NewCounterDef("persistence_error_with_type")
PersistenceLatency = NewTimerDef("persistence_latency")
PersistenceShardRPS = NewDimensionlessHistogramDef("persistence_shard_rps")
PersistenceErrShardExistsCounter = NewCounterDef("persistence_errors_shard_exists")
PersistenceErrShardOwnershipLostCounter = NewCounterDef("persistence_errors_shard_ownership_lost")
PersistenceErrConditionFailedCounter = NewCounterDef("persistence_errors_condition_failed")
Expand Down
47 changes: 34 additions & 13 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package client

import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -69,6 +68,7 @@ type (
logger log.Logger
clusterName string
ratelimiter quotas.RequestRateLimiter
healthSignals p.HealthSignalAggregator
}
)

Expand All @@ -87,6 +87,7 @@ func NewFactory(
clusterName string,
metricsHandler metrics.Handler,
logger log.Logger,
healthSignals p.HealthSignalAggregator,
) Factory {
return &factoryImpl{
dataStoreFactory: dataStoreFactory,
Expand All @@ -96,6 +97,7 @@ func NewFactory(
logger: logger,
clusterName: clusterName,
ratelimiter: ratelimiter,
healthSignals: healthSignals,
}
}

Expand All @@ -110,8 +112,9 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
if f.ratelimiter != nil {
result = p.NewTaskPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
return result, nil
}
Expand All @@ -127,8 +130,9 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
if f.ratelimiter != nil {
result = p.NewShardPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewShardPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -145,8 +149,9 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {
if f.ratelimiter != nil {
result = p.NewMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -163,8 +168,9 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err
if f.ratelimiter != nil {
result = p.NewClusterMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewClusterMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -181,8 +187,9 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
if f.ratelimiter != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewExecutionPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -197,8 +204,9 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
if f.ratelimiter != nil {
result = p.NewQueuePersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
f.updateNilMetricsAndHealthSignals()
result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewQueuePersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsHandler, f.logger)
Expand All @@ -207,6 +215,9 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
// Close closes this factory
func (f *factoryImpl) Close() {
f.dataStoreFactory.Close()
if f.healthSignals != nil {
f.healthSignals.Start()
}
}

func IsPersistenceTransientError(err error) bool {
Expand All @@ -217,3 +228,13 @@ func IsPersistenceTransientError(err error) bool {

return false
}

func (f *factoryImpl) updateNilMetricsAndHealthSignals() {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
}
if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
f.healthSignals.Start()
}
3 changes: 3 additions & 0 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/quotas"
Expand All @@ -57,6 +58,7 @@ type (
ServiceName primitives.ServiceName
MetricsHandler metrics.Handler
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
}

FactoryProviderFn func(NewFactoryParams) Factory
Expand Down Expand Up @@ -97,5 +99,6 @@ func FactoryProvider(
string(params.ClusterName),
params.MetricsHandler,
params.Logger,
params.HealthSignals,
)
}
Loading