Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot shard metrics #4365

Merged
merged 42 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0045fb7
moving window average
pdoerner May 18, 2023
ee44c22
remove channel avg impl
pdoerner May 18, 2023
68766b7
add signal aggregator
pdoerner May 18, 2023
bfc7002
adjust record fn
pdoerner May 18, 2023
82abfa4
add health signal clients
pdoerner May 19, 2023
f38f142
inject signal aggregator
pdoerner May 19, 2023
c037475
fix tests
pdoerner May 19, 2023
9543764
Merge branch 'master' into rate-limiter-metrics
pdoerner May 22, 2023
859950e
add metric emission
pdoerner May 22, 2023
e283bfb
race condition
pdoerner May 23, 2023
c43f1a1
Revert "race condition"
pdoerner May 24, 2023
dc5eb2a
Revert "add metric emission"
pdoerner May 24, 2023
217cb96
emit per shard RPS metric
pdoerner May 24, 2023
bf205a4
cleanup
pdoerner May 24, 2023
b8293dc
merge metric and signal clients
pdoerner May 24, 2023
27ec4c5
cleanup
pdoerner May 24, 2023
df989eb
cleanup
pdoerner May 24, 2023
6aa2e7f
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
f960396
linting
pdoerner May 24, 2023
3c409ce
remove generics
pdoerner May 24, 2023
fe2955d
cleanup
pdoerner May 24, 2023
f412f60
fix deferred metric fn
pdoerner May 24, 2023
3caed40
fix defer metric fn
pdoerner May 24, 2023
d64ff92
fix clients
pdoerner May 24, 2023
e91a0ee
types
pdoerner May 24, 2023
db4db37
acquire lock once
pdoerner May 24, 2023
c256fb1
locks
pdoerner May 24, 2023
98f2b66
array moving average
pdoerner May 24, 2023
f3dfa3d
Revert "array moving average"
pdoerner May 24, 2023
fc62af7
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
33448c8
cleanup
pdoerner May 24, 2023
5a94697
emit per shard RPS
pdoerner May 24, 2023
e179473
Merge branch 'master' into rate-limiter-metrics
pdoerner May 24, 2023
e0ce388
array average
pdoerner May 25, 2023
e48fcec
feedback
pdoerner May 25, 2023
d3b20f1
cleanup
pdoerner May 25, 2023
c42a921
Merge branch 'master' into rate-limiter-metrics
pdoerner May 25, 2023
b6fd862
handle nil health signals
pdoerner May 25, 2023
0bd90b0
feedback
pdoerner May 25, 2023
0911a11
allow start and stop when not enabled
pdoerner May 25, 2023
df5f722
some improvements
yycptt May 25, 2023
937c0a9
Merge branch 'master' into rate-limiter-metrics
yycptt May 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions common/aggregate/bench_moving_window_avg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

import (
"math/rand"
"testing"
"time"
)

// BenchmarkRingMovingWindowAvg
// BenchmarkRingMovingWindowAvg-10 12283236 94.76 ns/op

const (
testWindowSize = 3 * time.Second
testBufferSize = 200
)

func BenchmarkRingMovingWindowAvg(b *testing.B) {
avg := NewMovingWindowAvgImpl(testWindowSize, testBufferSize)
for i := 0; i < b.N; i++ {
avg.Record(rand.Int63())
avg.Average()
}
}
111 changes: 111 additions & 0 deletions common/aggregate/moving_window_average.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

import (
"container/ring"
"sync"
"time"
)

type (
MovingWindowAverage interface {
Record(val int64)
Average() float64
}

timestampedData struct {
value int64
timestamp time.Time
}

MovingWindowAvgImpl struct {
sync.Mutex
windowSize time.Duration
maxBufferSize int
head *ring.Ring
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd want to use a ring buffer instead of a linked list. The code is a little more complicated (unless we find a library) but the memory usage patterns will be much nicer. How many of these do we expect to exist at once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by a ring buffer? My understanding is that a circular linked list is an example of a ring buffer. Are you looking for something that can do dynamic resizing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think David is referring to use an array to implement the ring instead of using a linked-list, that way we can scan a block of memory instead of chasing pointers.

Also, I think unless we have a good idea about the cap on maxBufferSize, it might be tricky to get this right to not have overflows. An alternative approach that I can think of is to have a ring buffer where each item represents a unit of time. The size of the buffer will be our window size (in unit of time); e.g., if we want the moving average to calculate the last 60 seconds, we can have a buffer of size 60 where each item in the buffer represents the average for one second with a sum and count and we can keep track of the aggregated average. This can be a bit tricky to implement though since it requires a bit of book-keeping when recording and calculating the average. We can leave this for later, only if this becomes a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of completeness, I wrote an array-based implementation and in our bench test it preformed nearly the same (about 7ns/op slower than ring-based). If you want to see it for yourself, it's in commit 98f2b66b9b7a711c427d5f55de82bc55d135ddfc I see what you're saying about the contiguous memory of an array being preferable, but I'm not convinced this optimization is necessary at this point.

At the moment we are only using this to track averages for persistence latency and persistence error rate, so I do not expect to have a large number of them in memory at once. And for those metrics, we do not need to guarantee the perfect maxBufferSize to fit all observations in our time window; it just needs to be sufficiently large that we get an accurate picture of persistence health.

For now I'm going to leave it as is, unless there are strong feelings to the contrary. If we need to track other kinds of averages in the future, it may be worth it to revisit this and store elements based on time as Saman suggests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it all depends on the pattern of how the data structure is accessed. In the benchmark for instance, since we are calling Average after each Record which means there is only 1 element of array that might be expired after calling Average. So there is little benefit since there is no contagious scanning of the elements. If you change the benchmark to call Average every 10th times, array has 20% lower overhead. (45 ns/op vs 60 ns/op in my experiments).

So depending on what we expect the call pattern to be, this might or might not add much benefits. There is also the overhead of synchronization (mutex) which can mask the benefits when this is being updated concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I mean an array with head and tail indexes. Specifically something that can't do dynamic resizing, unlike the linked list.

I think this is one of those situations where microbenchmarks are misleading: if the whole thing fits in L1 and you're using it from a single thread, then chasing pointers is mostly free and everything will perform about the same. When either or both of those aren't true then the performance will start to diverge.

In general we have an io-bound service so you can say cpu optimizations don't matter, which is true up to a point. If we have just a few of these, then I agree. I was assuming we'd have one per namespace or more, where it'd be a bigger concern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I have been convinced; I swapped the linked list implementation for the array one. Seems there is no reason not to since we will most likely get better performance.

tail *ring.Ring
sum int64
count int64
}
)

func NewMovingWindowAvgImpl(
windowSize time.Duration,
maxBufferSize int,
) *MovingWindowAvgImpl {
buffer := ring.New(maxBufferSize)
return &MovingWindowAvgImpl{
windowSize: windowSize,
maxBufferSize: maxBufferSize,
head: buffer,
tail: buffer,
}
}

func (a *MovingWindowAvgImpl) Record(val int64) {
a.Lock()
defer a.Unlock()

if a.count == int64(a.maxBufferSize) {
a.expireOneLocked()
}

a.tail.Value = timestampedData{value: val, timestamp: time.Now()}
a.tail = a.tail.Next()

a.sum += val
a.count++
}

func (a *MovingWindowAvgImpl) Average() float64 {
a.Lock()
defer a.Unlock()

a.expireOldValuesLocked()
if a.count == 0 {
return 0
}
return float64(a.sum) / float64(a.count)
}

func (a *MovingWindowAvgImpl) expireOldValuesLocked() {
for ; a.head != a.tail; a.head = a.head.Next() {
data, ok := a.head.Value.(timestampedData)
if !ok || time.Since(data.timestamp) < a.windowSize {
break
}
a.sum -= data.value
a.count--
}
}

func (a *MovingWindowAvgImpl) expireOneLocked() {
if data, ok := a.head.Value.(timestampedData); ok {
a.sum -= data.value
a.count--
}
a.head = a.head.Next()
}
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ const (
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"
// PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals
PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize"
// PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key
PersistenceHealthSignalBufferSize = "system.persistenceHealthSignalBufferSize"
// ShardRPSWarnLimit is the per-shard RPS limit for warning
ShardRPSWarnLimit = "system.shardRPSWarnLimit"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,7 @@ var (
PersistenceFailures = NewCounterDef("persistence_errors")
PersistenceErrorWithType = NewCounterDef("persistence_error_with_type")
PersistenceLatency = NewTimerDef("persistence_latency")
PersistenceShardRPS = NewDimensionlessHistogramDef("persistence_shard_rps")
PersistenceErrShardExistsCounter = NewCounterDef("persistence_errors_shard_exists")
PersistenceErrShardOwnershipLostCounter = NewCounterDef("persistence_errors_shard_ownership_lost")
PersistenceErrConditionFailedCounter = NewCounterDef("persistence_errors_condition_failed")
Expand Down
58 changes: 45 additions & 13 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package client

import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -69,6 +68,7 @@ type (
logger log.Logger
clusterName string
ratelimiter quotas.RequestRateLimiter
healthSignals p.HealthSignalAggregator
}
)

Expand All @@ -87,6 +87,7 @@ func NewFactory(
clusterName string,
metricsHandler metrics.Handler,
logger log.Logger,
healthSignals p.HealthSignalAggregator,
) Factory {
return &factoryImpl{
dataStoreFactory: dataStoreFactory,
Expand All @@ -96,6 +97,7 @@ func NewFactory(
logger: logger,
clusterName: clusterName,
ratelimiter: ratelimiter,
healthSignals: healthSignals,
}
}

Expand All @@ -110,8 +112,13 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
if f.ratelimiter != nil {
result = p.NewTaskPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewTaskPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
return result, nil
}
Expand All @@ -127,8 +134,13 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
if f.ratelimiter != nil {
result = p.NewShardPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewShardPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewShardPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -145,8 +157,13 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {
if f.ratelimiter != nil {
result = p.NewMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -163,8 +180,13 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err
if f.ratelimiter != nil {
result = p.NewClusterMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewClusterMetadataPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -181,8 +203,13 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
if f.ratelimiter != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewExecutionPersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return result, nil
Expand All @@ -197,8 +224,13 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
if f.ratelimiter != nil {
result = p.NewQueuePersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsHandler != nil {
result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.logger)
if f.metricsHandler != nil || f.healthSignals != nil {
if f.metricsHandler == nil {
f.metricsHandler = metrics.NoopMetricsHandler
} else if f.healthSignals == nil {
f.healthSignals = p.NoopHealthSignalAggregator
}
result = p.NewQueuePersistenceMetricsClient(result, f.metricsHandler, f.healthSignals, f.logger)
}
result = p.NewQueuePersistenceRetryableClient(result, retryPolicy, IsPersistenceTransientError)
return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsHandler, f.logger)
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package client

import (
"go.temporal.io/server/common/persistence"
"go.uber.org/fx"

"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -57,6 +58,7 @@ type (
ServiceName primitives.ServiceName
MetricsHandler metrics.Handler
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
}

FactoryProviderFn func(NewFactoryParams) Factory
Expand Down Expand Up @@ -97,5 +99,6 @@ func FactoryProvider(
string(params.ClusterName),
params.MetricsHandler,
params.Logger,
params.HealthSignals,
)
}
Loading