Skip to content

Commit

Permalink
Add global-ratelimiter aggregator-side metrics (#6171)
Browse files Browse the repository at this point in the history
It would've been useful to have some of these while checking the initial rollout and troubleshooting, and the rest seem possibly-also-useful and easy to collect.

I've also adjusted the histogram buckets shared by other global ratelimiter stuff - nothing is planned that will be actually sensitive to the values, so this should be harmless. It's more for general "wait why do we have 100x more/fewer than we expected"-style discoveries.
  • Loading branch information
Groxx authored Jul 18, 2024
1 parent c9bbbe5 commit 4a51fb4
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 71 deletions.
52 changes: 36 additions & 16 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,10 @@ const (
// TaskValidatorScope is the metric for the taskvalidator's workflow check operation.
TaskValidatorScope

// FrontendGlobalRatelimiter is the metrics scope for frontend.GlobalRatelimiter
FrontendGlobalRatelimiter
// GlobalRatelimiter is the metrics scope for limiting-side common/quotas/global behavior
GlobalRatelimiter
// GlobalRatelimiterAggregator is the metrics scope for aggregator-side common/quotas/global behavior
GlobalRatelimiterAggregator

NumCommonScopes
)
Expand Down Expand Up @@ -1729,7 +1731,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HashringScope: {operation: "Hashring"},

// currently used by both frontend and history, but may grow to other limiting-host-services.
FrontendGlobalRatelimiter: {operation: "GlobalRatelimiter"},
GlobalRatelimiter: {operation: "GlobalRatelimiter"},
GlobalRatelimiterAggregator: {operation: "GlobalRatelimiterAggregator"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -2204,14 +2207,24 @@ const (

AsyncRequestPayloadSize

// limiter-side metrics
GlobalRatelimiterStartupUsageHistogram
GlobalRatelimiterFailingUsageHistogram
GlobalRatelimiterGlobalUsageHistogram
GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval.

GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval.
GlobalRatelimiterAllowedRequestsCount // per key/type usage
GlobalRatelimiterRejectedRequestsCount // per key/type usage
GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in use
GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in us

// aggregator-side metrics
GlobalRatelimiterInitialized
GlobalRatelimiterReinitialized
GlobalRatelimiterUpdated
GlobalRatelimiterDecayed
GlobalRatelimiterLimitsQueried
GlobalRatelimiterHostLimitsQueried
GlobalRatelimiterRemovedLimits
GlobalRatelimiterRemovedHostLimits

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -2866,10 +2879,18 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
GlobalRatelimiterFailingUsageHistogram: {metricName: "global_ratelimiter_failing_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterGlobalUsageHistogram: {metricName: "global_ratelimiter_global_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterUpdateLatency: {metricName: "global_ratelimiter_update_latency", metricType: Timer},

GlobalRatelimiterAllowedRequestsCount: {metricName: "global_ratelimiter_allowed_requests", metricType: Counter},
GlobalRatelimiterRejectedRequestsCount: {metricName: "global_ratelimiter_rejected_requests", metricType: Counter},
GlobalRatelimiterQuota: {metricName: "global_ratelimiter_quota", metricType: Gauge},

GlobalRatelimiterInitialized: {metricName: "global_ratelimiter_initialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterReinitialized: {metricName: "global_ratelimiter_reinitialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterUpdated: {metricName: "global_ratelimiter_updated", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterDecayed: {metricName: "global_ratelimiter_decayed", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterLimitsQueried: {metricName: "global_ratelimiter_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterHostLimitsQueried: {metricName: "global_ratelimiter_host_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down Expand Up @@ -3331,15 +3352,14 @@ var PersistenceLatencyBuckets = tally.DurationBuckets([]time.Duration{
})

// GlobalRatelimiterUsageHistogram contains buckets for tracking how many ratelimiters are
// in which state (startup, healthy, failing).
var GlobalRatelimiterUsageHistogram = tally.ValueBuckets{
0, // need an explicit 0 to record zeros
1, 2, 5, 10,
25, 50, 100,
250, 500, 1000,
1250, 1500, 2000,
// TODO: almost certainly want more, but how many?
}
// in which various states (startup, healthy, failing, as well as aggregator-side quantities, deleted, etc).
//
// this is intended for coarse scale checking, not alerting, so the buckets
// should be considered unstable and can be changed whenever desired.
var GlobalRatelimiterUsageHistogram = append(
tally.ValueBuckets{0}, // need an explicit 0 or zero is reported as 1
tally.MustMakeExponentialValueBuckets(1, 2, 17)..., // 1..65536
)

// ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement")
type ErrorClass uint8
Expand Down
56 changes: 29 additions & 27 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,39 @@ const (
goVersionTag = "go_version"
cadenceVersionTag = "cadence_version"

instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
taskList = "tasklist"
taskListType = "tasklistType"
workflowType = "workflowType"
activityType = "activityType"
decisionType = "decisionType"
invariantType = "invariantType"
shardScannerScanResult = "shardscanner_scan_result"
shardScannerFixResult = "shardscanner_fix_result"
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
taskList = "tasklist"
taskListType = "tasklistType"
workflowType = "workflowType"
activityType = "activityType"
decisionType = "decisionType"
invariantType = "invariantType"
shardScannerScanResult = "shardscanner_scan_result"
shardScannerFixResult = "shardscanner_fix_result"
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
workflowTerminationReason = "workflow_termination_reason"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
globalRatelimitType = "global_ratelimit_type"
globalRatelimitIsPrimary = "is_primary"
globalRatelimitCollectionName = "global_ratelimit_collection"
workflowTerminationReason = "workflow_termination_reason"

allValue = "all"
unknownValue = "_unknown_"
Expand Down
73 changes: 64 additions & 9 deletions common/quotas/global/algorithm/requestweighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/metrics"
)

type (
Expand All @@ -154,7 +155,8 @@ type (
impl struct {
// intentionally value-typed so caller cannot mutate the fields.
// manually copy data if this changes.
cfg Config
cfg Config
scope metrics.Scope

// mut protects usage, as it is the only mutable data
mut sync.Mutex
Expand Down Expand Up @@ -349,9 +351,10 @@ func (c configSnapshot) missedUpdateScalar(dataAge time.Duration) PerSecond {
//
// This instance is effectively single-threaded, but a small sharding wrapper should allow better concurrent
// throughput if needed (bound by CPU cores, as it's moderately CPU-costly).
func New(cfg Config) (RequestWeighted, error) {
func New(met metrics.Client, cfg Config) (RequestWeighted, error) {
i := &impl{
cfg: cfg,
scope: met.Scope(metrics.GlobalRatelimiterAggregator),
usage: make(map[Limit]map[Identity]requests, guessNumKeys), // start out relatively large

clock: clock.NewRealTimeSource(),
Expand All @@ -369,7 +372,10 @@ func (a *impl) Update(p UpdateParams) error {
return fmt.Errorf("bad args to update: %w", err)
}
a.mut.Lock()
defer a.mut.Unlock()
once := newOnce()
defer once.Do(a.mut.Unlock)

var initialized, reinitialized, updated, decayed int64

snap, err := a.snapshot()
if err != nil {
Expand All @@ -386,6 +392,7 @@ func (a *impl) Update(p UpdateParams) error {
aps := PerSecond(float64(req.Accepted) / float64(p.Elapsed/time.Second))
rps := PerSecond(float64(req.Rejected) / float64(p.Elapsed/time.Second))
if prev.lastUpdate.IsZero() {
initialized++
next = requests{
lastUpdate: snap.now,
accepted: aps, // no requests == 100% weight
Expand All @@ -394,15 +401,20 @@ func (a *impl) Update(p UpdateParams) error {
} else {
age := snap.now.Sub(prev.lastUpdate)
if snap.shouldGC(age) {
reinitialized++
// would have GC'd if we had seen it earlier, so it's the same as the zero state
next = requests{
lastUpdate: snap.now,
accepted: aps, // no requests == 100% weight
rejected: rps, // no requests == 100% weight
}
} else {
updated++
// compute the next rolling average step (`*reduce` simulates skipped updates)
reduce := snap.missedUpdateScalar(age)
if reduce < 1 {
decayed++
}
next = requests{
lastUpdate: snap.now,
// TODO: max(1, actual) so this does not lead to <1 rps allowances? or maybe just 1+actual and then reduce in used-responses?
Expand All @@ -418,14 +430,21 @@ func (a *impl) Update(p UpdateParams) error {
a.usage[key] = ih
}

once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics

a.scope.RecordHistogramValue(metrics.GlobalRatelimiterInitialized, float64(initialized))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterReinitialized, float64(reinitialized))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterUpdated, float64(updated))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterDecayed, float64(decayed))

return nil
}

// getWeightsLocked returns the weights of observed hosts (based on ALL requests), and the total number of requests accepted per second.
func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond) {
func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond, met Metrics) {
ir := a.usage[key]
if len(ir) == 0 {
return nil, 0
return nil, 0, met
}

weights = make(map[Identity]HostWeight, len(ir))
Expand All @@ -436,6 +455,7 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide
if snap.shouldGC(age) {
// old, clean up
delete(ir, id)
met.RemovedHostLimits++
continue
}

Expand All @@ -445,25 +465,31 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide
weights[id] = actual // populate with the reduced values so it doesn't have to be calculated again
total += actual // keep a running total to scale all values when done
usedRPS += reqs.accepted * reduce
met.HostLimits++
}

if len(ir) == 0 {
// completely empty Limit, gc it as well
delete(a.usage, key)
return nil, 0
met.RemovedLimits++
return nil, 0, met
}

for id := range ir {
// scale by the total.
// this also ensures all values are between 0 and 1 (inclusive)
weights[id] = weights[id] / total
}
return weights, usedRPS
met.Limits = 1
return weights, usedRPS, met
}

func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]HostWeight, usedRPS map[Limit]PerSecond, err error) {
a.mut.Lock()
defer a.mut.Unlock()
once := newOnce()
defer once.Do(a.mut.Unlock)

var cumulative Metrics

weights = make(map[Limit]HostWeight, len(limits))
usedRPS = make(map[Limit]PerSecond, len(limits))
Expand All @@ -472,14 +498,28 @@ func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]Hos
return nil, nil, err
}
for _, lim := range limits {
hosts, used := a.getWeightsLocked(lim, snap)
hosts, used, met := a.getWeightsLocked(lim, snap)

cumulative.Limits += met.Limits // always 1 or 0
cumulative.HostLimits += met.HostLimits
cumulative.RemovedLimits += met.RemovedLimits // always 0 or 1 (opposite Limits)
cumulative.RemovedHostLimits += met.RemovedHostLimits

if len(hosts) > 0 {
usedRPS[lim] = used // limit is known, has some usage
if weight, ok := hosts[host]; ok {
weights[lim] = weight // host has a known weight
}
}
}

once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics

a.scope.RecordHistogramValue(metrics.GlobalRatelimiterLimitsQueried, float64(cumulative.Limits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterHostLimitsQueried, float64(cumulative.HostLimits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedLimits, float64(cumulative.RemovedLimits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedHostLimits, float64(cumulative.RemovedHostLimits))

return weights, usedRPS, nil
}

Expand Down Expand Up @@ -538,3 +578,18 @@ func weighted[T numeric](newer, older T, weight float64) T {
type numeric interface {
constraints.Integer | constraints.Float
}

// non-sync version of sync.Once, for easier unlocking
type doOnce bool

func newOnce() *doOnce {
value := doOnce(false)
return &value
}

func (o *doOnce) Do(cb func()) {
if *o == false {
*o = true
cb()
}
}
Loading

0 comments on commit 4a51fb4

Please sign in to comment.